Browse Source

Merge branch 'master' into master

pull/111/head
Ross McFarland 8 years ago
committed by GitHub
parent
commit
4b740a48ff
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1196 additions and 37 deletions
  1. +1
    -1
      .git_hooks_pre-commit
  2. +25
    -0
      CHANGELOG.md
  3. +1
    -0
      README.md
  4. +1
    -1
      octodns/__init__.py
  5. +3
    -1
      octodns/manager.py
  6. +21
    -7
      octodns/provider/base.py
  7. +4
    -2
      octodns/provider/dyn.py
  8. +333
    -0
      octodns/provider/googlecloud.py
  9. +5
    -1
      octodns/provider/ns1.py
  10. +1
    -2
      octodns/provider/route53.py
  11. +64
    -17
      octodns/record.py
  12. +1
    -0
      requirements-dev.txt
  13. +1
    -0
      requirements.txt
  14. +1
    -0
      script/coverage
  15. +1
    -0
      script/test
  16. +12
    -0
      tests/test_octodns_manager.py
  17. +58
    -0
      tests/test_octodns_provider_base.py
  18. +4
    -4
      tests/test_octodns_provider_dnsimple.py
  19. +5
    -0
      tests/test_octodns_provider_dyn.py
  20. +429
    -0
      tests/test_octodns_provider_googlecloud.py
  21. +31
    -0
      tests/test_octodns_provider_ns1.py
  22. +194
    -1
      tests/test_octodns_record.py

+ 1
- 1
.git_hooks_pre-commit View File

@ -6,6 +6,6 @@ HOOKS=`dirname $0`
GIT=`dirname $HOOKS`
ROOT=`dirname $GIT`
source $ROOT/env/bin/activate
. $ROOT/env/bin/activate
$ROOT/script/lint
$ROOT/script/test

+ 25
- 0
CHANGELOG.md View File

@ -1,3 +1,28 @@
## v0.8.8 - 2017-10-24 - Google Cloud DNS, Large TXT Record support
* Added support for "chunking" TXT records where individual values were larger
than 255 chars. This is common with DKIM records involving multiple
providers.
* Added `GoogleCloudProvider`
* Configurable `UnsafePlan` thresholds to allow modification of how many
updates/deletes are allowed before a plan is declared dangerous.
* Manager.dump bug fix around empty zones.
* Prefer use of `.` over `source` in shell scripts
* `DynProvider` warns when it ignores unrecognized traffic directors.
## v0.8.7 - 2017-09-29 - OVH support
Adds an OVH provider.
## v0.8.6 - 2017-09-06 - CAA record type,
Misc fixes and improvments.
* Azure TXT record fix
* PowerDNS api support for https
* Configurable Route53 max retries and max-attempts
* Improved key ordering error message
## v0.8.5 - 2017-07-21 - Azure, NS1 escaping, & large zones
Relatively small delta this go around. No major themes or anything, just steady


+ 1
- 0
README.md View File

@ -153,6 +153,7 @@ The above command pulled the existing data out of Route53 and placed the results
| [CloudflareProvider](/octodns/provider/cloudflare.py) | A, AAAA, CAA, CNAME, MX, NS, SPF, TXT | No | CAA tags restricted |
| [DnsimpleProvider](/octodns/provider/dnsimple.py) | All | No | CAA tags restricted |
| [DynProvider](/octodns/provider/dyn.py) | All | Yes | |
| [GoogleCloudProvider](/octodns/provider/googlecloud.py) | A, AAAA, CAA, CNAME, MX, NAPTR, NS, PTR, SPF, SRV, TXT | No | |
| [Ns1Provider](/octodns/provider/ns1.py) | All | No | |
| [OVH](/octodns/provider/ovh.py) | A, AAAA, CNAME, MX, NAPTR, NS, PTR, SPF, SRV, SSHFP, TXT | No | |
| [PowerDnsProvider](/octodns/provider/powerdns.py) | All | No | |


+ 1
- 1
octodns/__init__.py View File

@ -3,4 +3,4 @@
from __future__ import absolute_import, division, print_function, \
unicode_literals
__VERSION__ = '0.8.6'
__VERSION__ = '0.8.8'

+ 3
- 1
octodns/manager.py View File

@ -11,7 +11,7 @@ from importlib import import_module
from os import environ
import logging
from .provider.base import BaseProvider
from .provider.base import BaseProvider, Plan
from .provider.yaml import YamlProvider
from .record import Record
from .yaml import safe_load
@ -362,6 +362,8 @@ class Manager(object):
source.populate(zone, lenient=lenient)
plan = target.plan(zone)
if plan is None:
plan = Plan(zone, zone, [])
target.apply(plan)
def validate_configs(self):


+ 21
- 7
octodns/provider/base.py View File

@ -21,10 +21,14 @@ class Plan(object):
MAX_SAFE_DELETE_PCENT = .3
MIN_EXISTING_RECORDS = 10
def __init__(self, existing, desired, changes):
def __init__(self, existing, desired, changes,
update_pcent_threshold=MAX_SAFE_UPDATE_PCENT,
delete_pcent_threshold=MAX_SAFE_DELETE_PCENT):
self.existing = existing
self.desired = desired
self.changes = changes
self.update_pcent_threshold = update_pcent_threshold
self.delete_pcent_threshold = delete_pcent_threshold
change_counts = {
'Create': 0,
@ -55,14 +59,14 @@ class Plan(object):
update_pcent = self.change_counts['Update'] / existing_record_count
delete_pcent = self.change_counts['Delete'] / existing_record_count
if update_pcent > self.MAX_SAFE_UPDATE_PCENT:
if update_pcent > self.update_pcent_threshold:
raise UnsafePlan('Too many updates, {} is over {} percent'
'({}/{})'.format(
update_pcent,
self.MAX_SAFE_UPDATE_PCENT * 100,
self.change_counts['Update'],
existing_record_count))
if delete_pcent > self.MAX_SAFE_DELETE_PCENT:
if delete_pcent > self.delete_pcent_threshold:
raise UnsafePlan('Too many deletes, {} is over {} percent'
'({}/{})'.format(
delete_pcent,
@ -79,11 +83,19 @@ class Plan(object):
class BaseProvider(BaseSource):
def __init__(self, id, apply_disabled=False):
def __init__(self, id, apply_disabled=False,
update_pcent_threshold=Plan.MAX_SAFE_UPDATE_PCENT,
delete_pcent_threshold=Plan.MAX_SAFE_DELETE_PCENT):
super(BaseProvider, self).__init__(id)
self.log.debug('__init__: id=%s, apply_disabled=%s', id,
apply_disabled)
self.log.debug('__init__: id=%s, apply_disabled=%s, '
'update_pcent_threshold=%d, delete_pcent_threshold=%d',
id,
apply_disabled,
update_pcent_threshold,
delete_pcent_threshold)
self.apply_disabled = apply_disabled
self.update_pcent_threshold = update_pcent_threshold
self.delete_pcent_threshold = delete_pcent_threshold
def _include_change(self, change):
'''
@ -124,7 +136,9 @@ class BaseProvider(BaseSource):
changes += extra
if changes:
plan = Plan(existing, desired, changes)
plan = Plan(existing, desired, changes,
self.update_pcent_threshold,
self.delete_pcent_threshold)
self.log.info('plan: %s', plan)
return plan
self.log.info('plan: No changes')


+ 4
- 2
octodns/provider/dyn.py View File

@ -290,7 +290,9 @@ class DynProvider(BaseProvider):
for td in get_all_dsf_services():
try:
fqdn, _type = td.label.split(':', 1)
except ValueError:
except ValueError as e:
self.log.warn("Failed to load TraficDirector '%s': %s",
td.label, e.message)
continue
tds[fqdn][_type] = td
self._traffic_directors = dict(tds)
@ -455,7 +457,7 @@ class DynProvider(BaseProvider):
return [{
'txtdata': v,
'ttl': record.ttl,
} for v in record.values]
} for v in record.chunked_values]
def _kwargs_for_SRV(self, record):
return [{


+ 333
- 0
octodns/provider/googlecloud.py View File

@ -0,0 +1,333 @@
#
#
#
from __future__ import absolute_import, division, print_function, \
unicode_literals
import shlex
import time
from logging import getLogger
from uuid import uuid4
from google.cloud import dns
from .base import BaseProvider
from ..record import Record
class GoogleCloudProvider(BaseProvider):
"""
Google Cloud DNS provider
google_cloud:
class: octodns.provider.googlecloud.GoogleCloudProvider
# Credentials file for a service_account or other account can be
# specified with the GOOGLE_APPLICATION_CREDENTIALS environment
# variable. (https://console.cloud.google.com/apis/credentials)
#
# The project to work on (not required)
# project: foobar
#
# The File with the google credentials (not required). If used, the
# "project" parameter needs to be set, else it will fall back to the
# "default credentials"
# credentials_file: ~/google_cloud_credentials_file.json
#
"""
SUPPORTS = set(('A', 'AAAA', 'CAA', 'CNAME', 'MX', 'NAPTR',
'NS', 'PTR', 'SPF', 'SRV', 'TXT'))
SUPPORTS_GEO = False
CHANGE_LOOP_WAIT = 5
def __init__(self, id, project=None, credentials_file=None,
*args, **kwargs):
if credentials_file:
self.gcloud_client = dns.Client.from_service_account_json(
credentials_file, project=project)
else:
self.gcloud_client = dns.Client(project=project)
# Logger
self.log = getLogger('GoogleCloudProvider[{}]'.format(id))
self.id = id
self._gcloud_zones = {}
super(GoogleCloudProvider, self).__init__(id, *args, **kwargs)
def _apply(self, plan):
"""Required function of manager.py to actually apply a record change.
:param plan: Contains the zones and changes to be made
:type plan: octodns.provider.base.Plan
:type return: void
"""
desired = plan.desired
changes = plan.changes
self.log.debug('_apply: zone=%s, len(changes)=%d', desired.name,
len(changes))
# Get gcloud zone, or create one if none existed before.
if desired.name not in self.gcloud_zones:
gcloud_zone = self._create_gcloud_zone(desired.name)
else:
gcloud_zone = self.gcloud_zones.get(desired.name)
gcloud_changes = gcloud_zone.changes()
for change in changes:
class_name = change.__class__.__name__
_rrset_func = getattr(
self, '_rrset_for_{}'.format(change.record._type))
if class_name == 'Create':
gcloud_changes.add_record_set(
_rrset_func(gcloud_zone, change.record))
elif class_name == 'Delete':
gcloud_changes.delete_record_set(
_rrset_func(gcloud_zone, change.record))
elif class_name == 'Update':
gcloud_changes.delete_record_set(
_rrset_func(gcloud_zone, change.existing))
gcloud_changes.add_record_set(
_rrset_func(gcloud_zone, change.new))
else:
raise RuntimeError('Change type "{}" for change "{!s}" '
'is none of "Create", "Delete" or "Update'
.format(class_name, change))
gcloud_changes.create()
for i in range(120):
gcloud_changes.reload()
# https://cloud.google.com/dns/api/v1/changes#resource
# status can be one of either "pending" or "done"
if gcloud_changes.status != 'pending':
break
self.log.debug("Waiting for changes to complete")
time.sleep(self.CHANGE_LOOP_WAIT)
if gcloud_changes.status != 'done':
raise RuntimeError("Timeout reached after {} seconds".format(
i * self.CHANGE_LOOP_WAIT))
def _create_gcloud_zone(self, dns_name):
"""Creates a google cloud ManagedZone with dns_name, and zone named
derived from it. calls .create() method and returns it.
:param dns_name: fqdn of zone to create
:type dns_name: str
:type return: new google.cloud.dns.ManagedZone
"""
# Zone name must begin with a letter, end with a letter or digit,
# and only contain lowercase letters, digits or dashes
zone_name = '{}-{}'.format(
dns_name[:-1].replace('.', '-'), uuid4().hex)
gcloud_zone = self.gcloud_client.zone(
name=zone_name,
dns_name=dns_name
)
gcloud_zone.create(client=self.gcloud_client)
# add this new zone to the list of zones.
self._gcloud_zones[gcloud_zone.dns_name] = gcloud_zone
self.log.info("Created zone {}. Fqdn {}.".format(zone_name, dns_name))
return gcloud_zone
def _get_gcloud_records(self, gcloud_zone, page_token=None):
""" Generator function which yields ResourceRecordSet for the managed
gcloud zone, until there are no more records to pull.
:param gcloud_zone: zone to pull records from
:type gcloud_zone: google.cloud.dns.ManagedZone
:param page_token: page token for the page to get
:return: a resource record set
:type return: google.cloud.dns.ResourceRecordSet
"""
gcloud_iterator = gcloud_zone.list_resource_record_sets(
page_token=page_token)
for gcloud_record in gcloud_iterator:
yield gcloud_record
# This is to get results which may be on a "paged" page.
# (if more than max_results) entries.
if gcloud_iterator.next_page_token:
for gcloud_record in self._get_gcloud_records(
gcloud_zone, gcloud_iterator.next_page_token):
# yield from is in python 3 only.
yield gcloud_record
def _get_cloud_zones(self, page_token=None):
"""Load all ManagedZones into the self._gcloud_zones dict which is
mapped with the dns_name as key.
:return: void
"""
gcloud_zones = self.gcloud_client.list_zones(page_token=page_token)
for gcloud_zone in gcloud_zones:
self._gcloud_zones[gcloud_zone.dns_name] = gcloud_zone
if gcloud_zones.next_page_token:
self._get_cloud_zones(gcloud_zones.next_page_token)
@property
def gcloud_zones(self):
if not self._gcloud_zones:
self._get_cloud_zones()
return self._gcloud_zones
def populate(self, zone, target=False, lenient=False):
"""Required function of manager.py to collect records from zone.
:param zone: A dns zone
:type zone: octodns.zone.Zone
:param target: Unused.
:type target: bool
:param lenient: Unused. Check octodns.manager for usage.
:type lenient: bool
:type return: void
"""
self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name,
target, lenient)
before = len(zone.records)
gcloud_zone = self.gcloud_zones.get(zone.name)
if gcloud_zone:
for gcloud_record in self._get_gcloud_records(gcloud_zone):
if gcloud_record.record_type.upper() not in self.SUPPORTS:
continue
record_name = gcloud_record.name
if record_name.endswith(zone.name):
# google cloud always return fqdn. Make relative record
# here. "root" records will then get the '' record_name,
# which is also the way octodns likes it.
record_name = record_name[:-(len(zone.name) + 1)]
typ = gcloud_record.record_type.upper()
data = getattr(self, '_data_for_{}'.format(typ))
data = data(gcloud_record)
data['type'] = typ
data['ttl'] = gcloud_record.ttl
self.log.debug('populate: adding record {} records: {!s}'
.format(record_name, data))
record = Record.new(zone, record_name, data, source=self)
zone.add_record(record)
self.log.info('populate: found %s records', len(zone.records) - before)
def _data_for_A(self, gcloud_record):
return {
'values': gcloud_record.rrdatas
}
_data_for_AAAA = _data_for_A
def _data_for_CAA(self, gcloud_record):
return {
'values': [{
'flags': v[0],
'tag': v[1],
'value': v[2]}
for v in [shlex.split(g) for g in gcloud_record.rrdatas]]}
def _data_for_CNAME(self, gcloud_record):
return {
'value': gcloud_record.rrdatas[0]
}
def _data_for_MX(self, gcloud_record):
return {'values': [{
"preference": v[0],
"exchange": v[1]}
for v in [shlex.split(g) for g in gcloud_record.rrdatas]]}
def _data_for_NAPTR(self, gcloud_record):
return {'values': [{
'order': v[0],
'preference': v[1],
'flags': v[2],
'service': v[3],
'regexp': v[4],
'replacement': v[5]}
for v in [shlex.split(g) for g in gcloud_record.rrdatas]]}
_data_for_NS = _data_for_A
_data_for_PTR = _data_for_CNAME
def _data_for_SPF(self, gcloud_record):
if len(gcloud_record.rrdatas) > 1:
return {
'values': gcloud_record.rrdatas}
return {
'value': gcloud_record.rrdatas[0]}
def _data_for_SRV(self, gcloud_record):
return {'values': [{
'priority': v[0],
'weight': v[1],
'port': v[2],
'target': v[3]}
for v in [shlex.split(g) for g in gcloud_record.rrdatas]]}
_data_for_TXT = _data_for_SPF
def _rrset_for_A(self, gcloud_zone, record):
return gcloud_zone.resource_record_set(
record.fqdn, record._type, record.ttl, record.values)
_rrset_for_AAAA = _rrset_for_A
def _rrset_for_CAA(self, gcloud_zone, record):
return gcloud_zone.resource_record_set(
record.fqdn, record._type, record.ttl, [
'{} {} {}'.format(v.flags, v.tag, v.value)
for v in record.values])
def _rrset_for_CNAME(self, gcloud_zone, record):
return gcloud_zone.resource_record_set(
record.fqdn, record._type, record.ttl, [record.value])
def _rrset_for_MX(self, gcloud_zone, record):
return gcloud_zone.resource_record_set(
record.fqdn, record._type, record.ttl, [
'{} {}'.format(v.preference, v.exchange)
for v in record.values])
def _rrset_for_NAPTR(self, gcloud_zone, record):
return gcloud_zone.resource_record_set(
record.fqdn, record._type, record.ttl, [
'{} {} "{}" "{}" "{}" {}'.format(
v.order, v.preference, v.flags, v.service,
v.regexp, v.replacement) for v in record.values])
_rrset_for_NS = _rrset_for_A
_rrset_for_PTR = _rrset_for_CNAME
def _rrset_for_SPF(self, gcloud_zone, record):
return gcloud_zone.resource_record_set(
record.fqdn, record._type, record.ttl, record.chunked_values)
def _rrset_for_SRV(self, gcloud_zone, record):
return gcloud_zone.resource_record_set(
record.fqdn, record._type, record.ttl, [
'{} {} {} {}'
.format(v.priority, v.weight, v.port, v.target)
for v in record.values])
_rrset_for_TXT = _rrset_for_SPF

+ 5
- 1
octodns/provider/ns1.py View File

@ -69,10 +69,14 @@ class Ns1Provider(BaseProvider):
}
def _data_for_CNAME(self, _type, record):
try:
value = record['short_answers'][0]
except IndexError:
value = None
return {
'ttl': record['ttl'],
'type': _type,
'value': record['short_answers'][0],
'value': value,
}
_data_for_ALIAS = _data_for_CNAME


+ 1
- 2
octodns/provider/route53.py View File

@ -114,8 +114,7 @@ class _Route53Record(object):
for v in record.values]
def _values_for_quoted(self, record):
return ['"{}"'.format(v.replace('"', '\\"'))
for v in record.values]
return record.chunked_values
_values_for_SPF = _values_for_quoted
_values_for_TXT = _values_for_quoted


+ 64
- 17
octodns/record.py View File

@ -95,6 +95,10 @@ class Record(object):
except KeyError:
raise Exception('Unknown record type: "{}"'.format(_type))
reasons = _class.validate(name, data)
try:
lenient |= data['octodns']['lenient']
except KeyError:
pass
if reasons:
if lenient:
cls.log.warn(ValidationError.build_message(fqdn, reasons))
@ -209,9 +213,30 @@ class _ValuesMixin(object):
values = []
try:
values = data['values']
if not values:
values = []
reasons.append('missing value(s)')
else:
# loop through copy of values
# remove invalid value from values
for value in list(values):
if value is None:
reasons.append('missing value(s)')
values.remove(value)
elif len(value) == 0:
reasons.append('empty value')
values.remove(value)
except KeyError:
try:
values = [data['value']]
value = data['value']
if value is None:
reasons.append('missing value(s)')
values = []
elif len(value) == 0:
reasons.append('empty value')
values = []
else:
values = [value]
except KeyError:
reasons.append('missing value(s)')
@ -236,10 +261,16 @@ class _ValuesMixin(object):
def _data(self):
ret = super(_ValuesMixin, self)._data()
if len(self.values) > 1:
ret['values'] = [getattr(v, 'data', v) for v in self.values]
else:
values = [getattr(v, 'data', v) for v in self.values if v]
if len(values) > 1:
ret['values'] = values
elif len(values) == 1:
ret['value'] = values[0]
elif len(self.values) == 1:
v = self.values[0]
ret['value'] = getattr(v, 'data', v)
if v:
ret['value'] = getattr(v, 'data', v)
return ret
def __repr__(self):
@ -347,6 +378,10 @@ class _ValueMixin(object):
value = None
try:
value = data['value']
if value is None:
reasons.append('missing value')
elif value == '':
reasons.append('empty value')
except KeyError:
reasons.append('missing value')
if value:
@ -364,7 +399,8 @@ class _ValueMixin(object):
def _data(self):
ret = super(_ValueMixin, self)._data()
ret['value'] = getattr(self.value, 'data', self.value)
if self.value:
ret['value'] = getattr(self.value, 'data', self.value)
return ret
def __repr__(self):
@ -706,8 +742,8 @@ class SshfpRecord(_ValuesMixin, Record):
_unescaped_semicolon_re = re.compile(r'\w;')
class SpfRecord(_ValuesMixin, Record):
_type = 'SPF'
class _ChunkedValuesMixin(_ValuesMixin):
CHUNK_SIZE = 255
@classmethod
def _validate_value(cls, value):
@ -716,9 +752,29 @@ class SpfRecord(_ValuesMixin, Record):
return []
def _process_values(self, values):
ret = []
for v in values:
if v and v[0] == '"':
v = v[1:-1]
ret.append(v.replace('" "', ''))
return ret
@property
def chunked_values(self):
values = []
for v in self.values:
v = v.replace('"', '\\"')
vs = [v[i:i + self.CHUNK_SIZE]
for i in range(0, len(v), self.CHUNK_SIZE)]
vs = '" "'.join(vs)
values.append('"{}"'.format(vs))
return values
class SpfRecord(_ChunkedValuesMixin, Record):
_type = 'SPF'
class SrvValue(object):
@classmethod
@ -799,14 +855,5 @@ class SrvRecord(_ValuesMixin, Record):
return [SrvValue(v) for v in values]
class TxtRecord(_ValuesMixin, Record):
class TxtRecord(_ChunkedValuesMixin, Record):
_type = 'TXT'
@classmethod
def _validate_value(cls, value):
if _unescaped_semicolon_re.search(value):
return ['unescaped ;']
return []
def _process_values(self, values):
return values

+ 1
- 0
requirements-dev.txt View File

@ -4,3 +4,4 @@ nose
pep8
pyflakes
requests_mock
setuptools>=36.4.0

+ 1
- 0
requirements.txt View File

@ -9,6 +9,7 @@ dnspython==1.15.0
docutils==0.14
dyn==1.8.0
futures==3.1.1
google-cloud==0.27.0
incf.countryutils==1.0
ipaddress==1.0.18
jmespath==0.9.3


+ 1
- 0
script/coverage View File

@ -24,6 +24,7 @@ export DNSIMPLE_TOKEN=
export DYN_CUSTOMER=
export DYN_PASSWORD=
export DYN_USERNAME=
export GOOGLE_APPLICATION_CREDENTIALS=
coverage run --branch --source=octodns `which nosetests` --with-xunit "$@"
coverage html


+ 1
- 0
script/test View File

@ -24,5 +24,6 @@ export DNSIMPLE_TOKEN=
export DYN_CUSTOMER=
export DYN_PASSWORD=
export DYN_USERNAME=
export GOOGLE_APPLICATION_CREDENTIALS=
nosetests "$@"

+ 12
- 0
tests/test_octodns_manager.py View File

@ -11,6 +11,7 @@ from unittest import TestCase
from octodns.record import Record
from octodns.manager import _AggregateTarget, MainThreadExecutor, Manager
from octodns.yaml import safe_load
from octodns.zone import Zone
from helpers import GeoProvider, NoSshFpProvider, SimpleProvider, \
@ -211,6 +212,17 @@ class TestManager(TestCase):
with self.assertRaises(IOError):
manager.dump('unknown.zone.', tmpdir.dirname, False, 'in')
def test_dump_empty(self):
with TemporaryDirectory() as tmpdir:
environ['YAML_TMP_DIR'] = tmpdir.dirname
manager = Manager(get_config_filename('simple.yaml'))
manager.dump('empty.', tmpdir.dirname, False, 'in')
with open(join(tmpdir.dirname, 'empty.yaml')) as fh:
data = safe_load(fh, False)
self.assertFalse(data)
def test_validate_configs(self):
Manager(get_config_filename('simple-validate.yaml')).validate_configs()


+ 58
- 0
tests/test_octodns_provider_base.py View File

@ -24,6 +24,8 @@ class HelperProvider(BaseProvider):
self.__extra_changes = extra_changes
self.apply_disabled = apply_disabled
self.include_change_callback = include_change_callback
self.update_pcent_threshold = Plan.MAX_SAFE_UPDATE_PCENT
self.delete_pcent_threshold = Plan.MAX_SAFE_DELETE_PCENT
def populate(self, zone, target=False, lenient=False):
pass
@ -289,3 +291,59 @@ class TestBaseProvider(TestCase):
Plan.MAX_SAFE_DELETE_PCENT))]
Plan(zone, zone, changes).raise_if_unsafe()
def test_safe_updates_min_existing_override(self):
safe_pcent = .4
# 40% + 1 fails when more
# than MIN_EXISTING_RECORDS exist
zone = Zone('unit.tests.', [])
record = Record.new(zone, 'a', {
'ttl': 30,
'type': 'A',
'value': '1.2.3.4',
})
for i in range(int(Plan.MIN_EXISTING_RECORDS)):
zone.add_record(Record.new(zone, str(i), {
'ttl': 60,
'type': 'A',
'value': '2.3.4.5'
}))
changes = [Update(record, record)
for i in range(int(Plan.MIN_EXISTING_RECORDS *
safe_pcent) + 1)]
with self.assertRaises(UnsafePlan) as ctx:
Plan(zone, zone, changes,
update_pcent_threshold=safe_pcent).raise_if_unsafe()
self.assertTrue('Too many updates' in ctx.exception.message)
def test_safe_deletes_min_existing_override(self):
safe_pcent = .4
# 40% + 1 fails when more
# than MIN_EXISTING_RECORDS exist
zone = Zone('unit.tests.', [])
record = Record.new(zone, 'a', {
'ttl': 30,
'type': 'A',
'value': '1.2.3.4',
})
for i in range(int(Plan.MIN_EXISTING_RECORDS)):
zone.add_record(Record.new(zone, str(i), {
'ttl': 60,
'type': 'A',
'value': '2.3.4.5'
}))
changes = [Delete(record)
for i in range(int(Plan.MIN_EXISTING_RECORDS *
safe_pcent) + 1)]
with self.assertRaises(UnsafePlan) as ctx:
Plan(zone, zone, changes,
delete_pcent_threshold=safe_pcent).raise_if_unsafe()
self.assertTrue('Too many deletes' in ctx.exception.message)

+ 4
- 4
tests/test_octodns_provider_dnsimple.py View File

@ -96,23 +96,23 @@ class TestDnsimpleProvider(TestCase):
mock.get(ANY, text=fh.read())
zone = Zone('unit.tests.', [])
provider.populate(zone)
provider.populate(zone, lenient=True)
self.assertEquals(set([
Record.new(zone, '', {
'ttl': 3600,
'type': 'SSHFP',
'values': []
}),
}, lenient=True),
Record.new(zone, '_srv._tcp', {
'ttl': 600,
'type': 'SRV',
'values': []
}),
}, lenient=True),
Record.new(zone, 'naptr', {
'ttl': 600,
'type': 'NAPTR',
'values': []
}),
}, lenient=True),
]), zone.records)
def test_apply(self):


+ 5
- 0
tests/test_octodns_provider_dyn.py View File

@ -601,6 +601,7 @@ class TestDynProviderGeo(TestCase):
provider = DynProvider('test', 'cust', 'user', 'pass', True)
# short-circuit session checking
provider._dyn_sess = True
provider.log.warn = MagicMock()
# no tds
mock.side_effect = [{'data': []}]
@ -649,6 +650,10 @@ class TestDynProviderGeo(TestCase):
set(tds.keys()))
self.assertEquals(['A'], tds['unit.tests.'].keys())
self.assertEquals(['A'], tds['geo.unit.tests.'].keys())
provider.log.warn.assert_called_with("Failed to load TraficDirector "
"'%s': %s", 'something else',
'need more than 1 value to '
'unpack')
@patch('dyn.core.SessionEngine.execute')
def test_traffic_director_monitor(self, mock):


+ 429
- 0
tests/test_octodns_provider_googlecloud.py View File

@ -0,0 +1,429 @@
#
#
#
from __future__ import absolute_import, division, print_function, \
unicode_literals
from octodns.record import Create, Delete, Update, Record
from octodns.provider.googlecloud import GoogleCloudProvider
from octodns.zone import Zone
from octodns.provider.base import Plan, BaseProvider
from unittest import TestCase
from mock import Mock, patch, PropertyMock
zone = Zone(name='unit.tests.', sub_zones=[])
octo_records = []
octo_records.append(Record.new(zone, '', {
'ttl': 0,
'type': 'A',
'values': ['1.2.3.4', '10.10.10.10']}))
octo_records.append(Record.new(zone, 'a', {
'ttl': 1,
'type': 'A',
'values': ['1.2.3.4', '1.1.1.1']}))
octo_records.append(Record.new(zone, 'aa', {
'ttl': 9001,
'type': 'A',
'values': ['1.2.4.3']}))
octo_records.append(Record.new(zone, 'aaa', {
'ttl': 2,
'type': 'A',
'values': ['1.1.1.3']}))
octo_records.append(Record.new(zone, 'cname', {
'ttl': 3,
'type': 'CNAME',
'value': 'a.unit.tests.'}))
octo_records.append(Record.new(zone, 'mx1', {
'ttl': 3,
'type': 'MX',
'values': [{
'priority': 10,
'value': 'mx1.unit.tests.',
}, {
'priority': 20,
'value': 'mx2.unit.tests.',
}]}))
octo_records.append(Record.new(zone, 'mx2', {
'ttl': 3,
'type': 'MX',
'values': [{
'priority': 10,
'value': 'mx1.unit.tests.',
}]}))
octo_records.append(Record.new(zone, '', {
'ttl': 4,
'type': 'NS',
'values': ['ns1.unit.tests.', 'ns2.unit.tests.']}))
octo_records.append(Record.new(zone, 'foo', {
'ttl': 5,
'type': 'NS',
'value': 'ns1.unit.tests.'}))
octo_records.append(Record.new(zone, '_srv._tcp', {
'ttl': 6,
'type': 'SRV',
'values': [{
'priority': 10,
'weight': 20,
'port': 30,
'target': 'foo-1.unit.tests.',
}, {
'priority': 12,
'weight': 30,
'port': 30,
'target': 'foo-2.unit.tests.',
}]}))
octo_records.append(Record.new(zone, '_srv2._tcp', {
'ttl': 7,
'type': 'SRV',
'values': [{
'priority': 12,
'weight': 17,
'port': 1,
'target': 'srvfoo.unit.tests.',
}]}))
octo_records.append(Record.new(zone, 'txt1', {
'ttl': 8,
'type': 'TXT',
'value': 'txt singleton test'}))
octo_records.append(Record.new(zone, 'txt2', {
'ttl': 9,
'type': 'TXT',
'values': ['txt multiple test', 'txt multiple test 2']}))
octo_records.append(Record.new(zone, 'naptr', {
'ttl': 9,
'type': 'NAPTR',
'values': [{
'order': 100,
'preference': 10,
'flags': 'S',
'service': 'SIP+D2U',
'regexp': "!^.*$!sip:customer-service@unit.tests!",
'replacement': '_sip._udp.unit.tests.'
}]}))
octo_records.append(Record.new(zone, 'caa', {
'ttl': 9,
'type': 'CAA',
'value': {
'flags': 0,
'tag': 'issue',
'value': 'ca.unit.tests',
}}))
for record in octo_records:
zone.add_record(record)
# This is the format which the google API likes.
resource_record_sets = [
('unit.tests.', u'A', 0, [u'1.2.3.4', u'10.10.10.10']),
(u'a.unit.tests.', u'A', 1, [u'1.1.1.1', u'1.2.3.4']),
(u'aa.unit.tests.', u'A', 9001, [u'1.2.4.3']),
(u'aaa.unit.tests.', u'A', 2, [u'1.1.1.3']),
(u'cname.unit.tests.', u'CNAME', 3, [u'a.unit.tests.']),
(u'mx1.unit.tests.', u'MX', 3,
[u'10 mx1.unit.tests.', u'20 mx2.unit.tests.']),
(u'mx2.unit.tests.', u'MX', 3, [u'10 mx1.unit.tests.']),
('unit.tests.', u'NS', 4, [u'ns1.unit.tests.', u'ns2.unit.tests.']),
(u'foo.unit.tests.', u'NS', 5, [u'ns1.unit.tests.']),
(u'_srv._tcp.unit.tests.', u'SRV', 6,
[u'10 20 30 foo-1.unit.tests.', u'12 30 30 foo-2.unit.tests.']),
(u'_srv2._tcp.unit.tests.', u'SRV', 7, [u'12 17 1 srvfoo.unit.tests.']),
(u'txt1.unit.tests.', u'TXT', 8, [u'txt singleton test']),
(u'txt2.unit.tests.', u'TXT', 9,
[u'txt multiple test', u'txt multiple test 2']),
(u'naptr.unit.tests.', u'NAPTR', 9, [
u'100 10 "S" "SIP+D2U" "!^.*$!sip:customer-service@unit.tests!"'
u' _sip._udp.unit.tests.']),
(u'caa.unit.tests.', u'CAA', 9, [u'0 issue ca.unit.tests'])
]
class DummyResourceRecordSet:
def __init__(self, record_name, record_type, ttl, rrdatas):
self.name = record_name
self.record_type = record_type
self.ttl = ttl
self.rrdatas = rrdatas
def __eq__(self, other):
try:
return self.name == other.name \
and self.record_type == other.record_type \
and self.ttl == other.ttl \
and sorted(self.rrdatas) == sorted(other.rrdatas)
except:
return False
def __repr__(self):
return "{} {} {} {!s}"\
.format(self.name, self.record_type, self.ttl, self.rrdatas)
def __hash__(self):
return hash(repr(self))
class DummyGoogleCloudZone:
def __init__(self, dns_name, name=""):
self.dns_name = dns_name
self.name = name
def resource_record_set(self, *args):
return DummyResourceRecordSet(*args)
def list_resource_record_sets(self, *args):
pass
def create(self, *args, **kwargs):
pass
class DummyIterator:
"""Returns a mock DummyIterator object to use in testing.
This is because API calls for google cloud DNS, if paged, contains a
"next_page_token", which can be used to grab a subsequent
iterator with more results.
:type return: DummyIterator
"""
def __init__(self, list_of_stuff, page_token=None):
self.iterable = iter(list_of_stuff)
self.next_page_token = page_token
def __iter__(self):
return self
def next(self):
return self.iterable.next()
class TestGoogleCloudProvider(TestCase):
@patch('octodns.provider.googlecloud.dns')
def _get_provider(*args):
'''Returns a mock GoogleCloudProvider object to use in testing.
:type return: GoogleCloudProvider
'''
return GoogleCloudProvider(id=1, project="mock")
@patch('octodns.provider.googlecloud.dns')
def test___init__(self, *_):
self.assertIsInstance(GoogleCloudProvider(id=1,
credentials_file="test",
project="unit test"),
BaseProvider)
self.assertIsInstance(GoogleCloudProvider(id=1),
BaseProvider)
@patch('octodns.provider.googlecloud.time.sleep')
@patch('octodns.provider.googlecloud.dns')
def test__apply(self, *_):
class DummyDesired:
def __init__(self, name, changes):
self.name = name
self.changes = changes
apply_z = Zone("unit.tests.", [])
create_r = Record.new(apply_z, '', {
'ttl': 0,
'type': 'A',
'values': ['1.2.3.4', '10.10.10.10']})
delete_r = Record.new(apply_z, 'a', {
'ttl': 1,
'type': 'A',
'values': ['1.2.3.4', '1.1.1.1']})
update_existing_r = Record.new(apply_z, 'aa', {
'ttl': 9001,
'type': 'A',
'values': ['1.2.4.3']})
update_new_r = Record.new(apply_z, 'aa', {
'ttl': 666,
'type': 'A',
'values': ['1.4.3.2']})
gcloud_zone_mock = DummyGoogleCloudZone("unit.tests.", "unit-tests")
status_mock = Mock()
return_values_for_status = iter(
["pending"] * 11 + ['done', 'done'])
type(status_mock).status = PropertyMock(
side_effect=return_values_for_status.next)
gcloud_zone_mock.changes = Mock(return_value=status_mock)
provider = self._get_provider()
provider.gcloud_client = Mock()
provider._gcloud_zones = {"unit.tests.": gcloud_zone_mock}
desired = Mock()
desired.name = "unit.tests."
changes = []
changes.append(Create(create_r))
changes.append(Delete(delete_r))
changes.append(Update(existing=update_existing_r, new=update_new_r))
provider.apply(Plan(
existing=[update_existing_r, delete_r],
desired=desired,
changes=changes
))
calls_mock = gcloud_zone_mock.changes.return_value
mocked_calls = []
for mock_call in calls_mock.add_record_set.mock_calls:
mocked_calls.append(mock_call[1][0])
self.assertEqual(mocked_calls, [
DummyResourceRecordSet(
'unit.tests.', 'A', 0, ['1.2.3.4', '10.10.10.10']),
DummyResourceRecordSet(
'aa.unit.tests.', 'A', 666, ['1.4.3.2'])
])
mocked_calls2 = []
for mock_call in calls_mock.delete_record_set.mock_calls:
mocked_calls2.append(mock_call[1][0])
self.assertEqual(mocked_calls2, [
DummyResourceRecordSet(
'a.unit.tests.', 'A', 1, ['1.2.3.4', '1.1.1.1']),
DummyResourceRecordSet(
'aa.unit.tests.', 'A', 9001, ['1.2.4.3'])
])
type(status_mock).status = "pending"
with self.assertRaises(RuntimeError):
provider.apply(Plan(
existing=[update_existing_r, delete_r],
desired=desired,
changes=changes
))
unsupported_change = Mock()
unsupported_change.__len__ = Mock(return_value=1)
type_mock = Mock()
type_mock._type = "A"
unsupported_change.record = type_mock
mock_plan = Mock()
type(mock_plan).desired = PropertyMock(return_value=DummyDesired(
"dummy name", []))
type(mock_plan).changes = [unsupported_change]
with self.assertRaises(RuntimeError):
provider.apply(mock_plan)
def test__get_gcloud_client(self):
provider = self._get_provider()
self.assertIsInstance(provider, GoogleCloudProvider)
@patch('octodns.provider.googlecloud.dns')
def test_populate(self, _):
def _get_mock_zones(page_token=None):
if not page_token:
return DummyIterator([
DummyGoogleCloudZone('example.com.'),
], page_token="MOCK_PAGE_TOKEN")
elif page_token == "MOCK_PAGE_TOKEN":
return DummyIterator([
DummyGoogleCloudZone('example2.com.'),
], page_token="MOCK_PAGE_TOKEN2")
return DummyIterator([
google_cloud_zone
])
def _get_mock_record_sets(page_token=None):
if not page_token:
return DummyIterator(
[DummyResourceRecordSet(*v) for v in
resource_record_sets[:3]], page_token="MOCK_PAGE_TOKEN")
elif page_token == "MOCK_PAGE_TOKEN":
return DummyIterator(
[DummyResourceRecordSet(*v) for v in
resource_record_sets[3:5]], page_token="MOCK_PAGE_TOKEN2")
return DummyIterator(
[DummyResourceRecordSet(*v) for v in resource_record_sets[5:]])
google_cloud_zone = DummyGoogleCloudZone('unit.tests.')
provider = self._get_provider()
provider.gcloud_client.list_zones = Mock(side_effect=_get_mock_zones)
google_cloud_zone.list_resource_record_sets = Mock(
side_effect=_get_mock_record_sets)
self.assertEqual(provider.gcloud_zones.get("unit.tests.").dns_name,
"unit.tests.")
test_zone = Zone('unit.tests.', [])
provider.populate(test_zone)
# test_zone gets fed the same records as zone does, except it's in
# the format returned by google API, so after populate they should look
# excactly the same.
self.assertEqual(test_zone.records, zone.records)
test_zone2 = Zone('nonexistant.zone.', [])
provider.populate(test_zone2, False, False)
self.assertEqual(len(test_zone2.records), 0,
msg="Zone should not get records from wrong domain")
provider.SUPPORTS = set()
test_zone3 = Zone('unit.tests.', [])
provider.populate(test_zone3)
self.assertEqual(len(test_zone3.records), 0)
@patch('octodns.provider.googlecloud.dns')
def test_populate_corner_cases(self, _):
provider = self._get_provider()
test_zone = Zone('unit.tests.', [])
not_same_fqdn = DummyResourceRecordSet(
'unit.tests.gr', u'A', 0, [u'1.2.3.4']),
provider._get_gcloud_records = Mock(
side_effect=[not_same_fqdn])
provider._gcloud_zones = {
"unit.tests.": DummyGoogleCloudZone("unit.tests.", "unit-tests")}
provider.populate(test_zone)
self.assertEqual(len(test_zone.records), 1)
self.assertEqual(test_zone.records.pop().fqdn,
u'unit.tests.gr.unit.tests.')
def test__get_gcloud_zone(self):
provider = self._get_provider()
provider.gcloud_client = Mock()
provider.gcloud_client.list_zones = Mock(
return_value=DummyIterator([]))
self.assertIsNone(provider.gcloud_zones.get("nonexistant.xone"),
msg="Check that nonexistant zones return None when"
"there's no create=True flag")
def test__get_rrsets(self):
provider = self._get_provider()
dummy_gcloud_zone = DummyGoogleCloudZone("unit.tests")
for octo_record in octo_records:
_rrset_func = getattr(
provider, '_rrset_for_{}'.format(octo_record._type))
self.assertEqual(
_rrset_func(dummy_gcloud_zone, octo_record).record_type,
octo_record._type
)
def test__create_zone(self):
provider = self._get_provider()
provider.gcloud_client = Mock()
provider.gcloud_client.list_zones = Mock(
return_value=DummyIterator([]))
mock_zone = provider._create_gcloud_zone("nonexistant.zone.mock")
mock_zone.create.assert_called()
provider.gcloud_client.zone.assert_called()

+ 31
- 0
tests/test_octodns_provider_ns1.py View File

@ -326,3 +326,34 @@ class TestNs1Provider(TestCase):
})
self.assertEquals(['foo; bar baz; blip'],
provider._params_for_TXT(record)['answers'])
def test_data_for_CNAME(self):
provider = Ns1Provider('test', 'api-key')
# answers from nsone
a_record = {
'ttl': 31,
'type': 'CNAME',
'short_answers': ['foo.unit.tests.']
}
a_expected = {
'ttl': 31,
'type': 'CNAME',
'value': 'foo.unit.tests.'
}
self.assertEqual(a_expected,
provider._data_for_CNAME(a_record['type'], a_record))
# no answers from nsone
b_record = {
'ttl': 32,
'type': 'CNAME',
'short_answers': []
}
b_expected = {
'ttl': 32,
'type': 'CNAME',
'value': None
}
self.assertEqual(b_expected,
provider._data_for_CNAME(b_record['type'], b_record))

+ 194
- 1
tests/test_octodns_record.py View File

@ -96,6 +96,57 @@ class TestRecord(TestCase):
DummyRecord().__repr__()
def test_values_mixin_data(self):
# no values, no value or values in data
a = ARecord(self.zone, '', {
'type': 'A',
'ttl': 600,
'values': []
})
self.assertNotIn('values', a.data)
# empty value, no value or values in data
b = ARecord(self.zone, '', {
'type': 'A',
'ttl': 600,
'values': ['']
})
self.assertNotIn('value', b.data)
# empty/None values, no value or values in data
c = ARecord(self.zone, '', {
'type': 'A',
'ttl': 600,
'values': ['', None]
})
self.assertNotIn('values', c.data)
# empty/None values and valid, value in data
c = ARecord(self.zone, '', {
'type': 'A',
'ttl': 600,
'values': ['', None, '10.10.10.10']
})
self.assertNotIn('values', c.data)
self.assertEqual('10.10.10.10', c.data['value'])
def test_value_mixin_data(self):
# unspecified value, no value in data
a = AliasRecord(self.zone, '', {
'type': 'ALIAS',
'ttl': 600,
'value': None
})
self.assertNotIn('value', a.data)
# unspecified value, no value in data
a = AliasRecord(self.zone, '', {
'type': 'ALIAS',
'ttl': 600,
'value': ''
})
self.assertNotIn('value', a.data)
def test_geo(self):
geo_data = {'ttl': 42, 'values': ['5.2.3.4', '6.2.3.4'],
'geo': {'AF': ['1.1.1.1'],
@ -733,6 +784,16 @@ class TestRecordValidation(TestCase):
}, lenient=True)
self.assertEquals(('value',), ctx.exception.args)
# no exception if we're in lenient mode from config
Record.new(self.zone, 'www', {
'octodns': {
'lenient': True
},
'type': 'A',
'ttl': -1,
'value': '1.2.3.4',
}, lenient=True)
def test_A_and_values_mixin(self):
# doesn't blow up
Record.new(self.zone, '', {
@ -740,6 +801,13 @@ class TestRecordValidation(TestCase):
'ttl': 600,
'value': '1.2.3.4',
})
Record.new(self.zone, '', {
'type': 'A',
'ttl': 600,
'values': [
'1.2.3.4',
]
})
Record.new(self.zone, '', {
'type': 'A',
'ttl': 600,
@ -749,13 +817,60 @@ class TestRecordValidation(TestCase):
]
})
# missing value(s)
# missing value(s), no value or value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'A',
'ttl': 600,
})
self.assertEquals(['missing value(s)'], ctx.exception.reasons)
# missing value(s), empty values
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'A',
'ttl': 600,
'values': []
})
self.assertEquals(['missing value(s)'], ctx.exception.reasons)
# missing value(s), None values
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'A',
'ttl': 600,
'values': None
})
self.assertEquals(['missing value(s)'], ctx.exception.reasons)
# missing value(s) and empty value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'A',
'ttl': 600,
'values': [None, '']
})
self.assertEquals(['missing value(s)',
'empty value'], ctx.exception.reasons)
# missing value(s), None value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'A',
'ttl': 600,
'value': None
})
self.assertEquals(['missing value(s)'], ctx.exception.reasons)
# empty value, empty string value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'A',
'ttl': 600,
'value': ''
})
self.assertEquals(['empty value'], ctx.exception.reasons)
# missing value(s) & ttl
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
@ -912,6 +1027,24 @@ class TestRecordValidation(TestCase):
})
self.assertEquals(['missing value'], ctx.exception.reasons)
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'ALIAS',
'ttl': 600,
'value': None
})
self.assertEquals(['missing value'], ctx.exception.reasons)
# empty value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'ALIAS',
'ttl': 600,
'value': ''
})
self.assertEquals(['empty value'], ctx.exception.reasons)
# missing trailing .
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
@ -1490,3 +1623,63 @@ class TestRecordValidation(TestCase):
'value': 'this has some; semi-colons\; in it',
})
self.assertEquals(['unescaped ;'], ctx.exception.reasons)
def test_TXT_long_value_chunking(self):
expected = '"Lorem ipsum dolor sit amet, consectetur adipiscing ' \
'elit, sed do eiusmod tempor incididunt ut labore et dolore ' \
'magna aliqua. Ut enim ad minim veniam, quis nostrud ' \
'exercitation ullamco laboris nisi ut aliquip ex ea commodo ' \
'consequat. Duis aute irure dolor i" "n reprehenderit in ' \
'voluptate velit esse cillum dolore eu fugiat nulla pariatur. ' \
'Excepteur sint occaecat cupidatat non proident, sunt in culpa ' \
'qui officia deserunt mollit anim id est laborum."'
long_value = 'Lorem ipsum dolor sit amet, consectetur adipiscing ' \
'elit, sed do eiusmod tempor incididunt ut labore et dolore ' \
'magna aliqua. Ut enim ad minim veniam, quis nostrud ' \
'exercitation ullamco laboris nisi ut aliquip ex ea commodo ' \
'consequat. Duis aute irure dolor in reprehenderit in ' \
'voluptate velit esse cillum dolore eu fugiat nulla ' \
'pariatur. Excepteur sint occaecat cupidatat non proident, ' \
'sunt in culpa qui officia deserunt mollit anim id est ' \
'laborum.'
# Single string
single = Record.new(self.zone, '', {
'type': 'TXT',
'ttl': 600,
'values': [
'hello world',
long_value,
'this has some\; semi-colons\; in it',
]
})
self.assertEquals(3, len(single.values))
self.assertEquals(3, len(single.chunked_values))
# Note we are checking that this normalizes the chunking, not that we
# get out what we put in.
self.assertEquals(expected, single.chunked_values[0])
long_split_value = '"Lorem ipsum dolor sit amet, consectetur ' \
'adipiscing elit, sed do eiusmod tempor incididunt ut ' \
'labore et dolore magna aliqua. Ut enim ad minim veniam, ' \
'quis nostrud exercitation ullamco laboris nisi ut aliquip ' \
'ex" " ea commodo consequat. Duis aute irure dolor in ' \
'reprehenderit in voluptate velit esse cillum dolore eu ' \
'fugiat nulla pariatur. Excepteur sint occaecat cupidatat ' \
'non proident, sunt in culpa qui officia deserunt mollit ' \
'anim id est laborum."'
# Chunked
chunked = Record.new(self.zone, '', {
'type': 'TXT',
'ttl': 600,
'values': [
'"hello world"',
long_split_value,
'"this has some\; semi-colons\; in it"',
]
})
self.assertEquals(expected, chunked.chunked_values[0])
# should be single values, no quoting
self.assertEquals(single.values, chunked.values)
# should be chunked values, with quoting
self.assertEquals(single.chunked_values, chunked.chunked_values)

Loading…
Cancel
Save