|
|
|
@ -20,7 +20,7 @@ from octodns.zone import Zone |
|
|
|
class HelperProvider(BaseProvider): |
|
|
|
log = getLogger('HelperProvider') |
|
|
|
|
|
|
|
SUPPORTS = set(('A', 'PTR')) |
|
|
|
SUPPORTS = set(('A', 'NS', 'PTR')) |
|
|
|
SUPPORTS_MULTIVALUE_PTR = False |
|
|
|
SUPPORTS_DYNAMIC = False |
|
|
|
|
|
|
|
@ -36,7 +36,7 @@ class HelperProvider(BaseProvider): |
|
|
|
self.delete_pcent_threshold = Plan.MAX_SAFE_DELETE_PCENT |
|
|
|
|
|
|
|
def populate(self, zone, target=False, lenient=False): |
|
|
|
pass |
|
|
|
return True |
|
|
|
|
|
|
|
def _include_change(self, change): |
|
|
|
return not self.include_change_callback or \ |
|
|
|
@ -167,6 +167,28 @@ class TestBaseProvider(TestCase): |
|
|
|
self.assertTrue(plan) |
|
|
|
self.assertEqual(1, len(plan.changes)) |
|
|
|
|
|
|
|
def test_plan_with_process_desired_zone_kwarg_fallback(self): |
|
|
|
ignored = Zone('unit.tests.', []) |
|
|
|
|
|
|
|
class OldApiProvider(HelperProvider): |
|
|
|
|
|
|
|
def _process_desired_zone(self, desired): |
|
|
|
return desired |
|
|
|
|
|
|
|
# No change, thus no plan |
|
|
|
provider = OldApiProvider([]) |
|
|
|
self.assertEqual(None, provider.plan(ignored)) |
|
|
|
|
|
|
|
class OtherTypeErrorProvider(HelperProvider): |
|
|
|
|
|
|
|
def _process_desired_zone(self, desired, exists=False): |
|
|
|
raise TypeError('foo') |
|
|
|
|
|
|
|
provider = OtherTypeErrorProvider([]) |
|
|
|
with self.assertRaises(TypeError) as ctx: |
|
|
|
provider.plan(ignored) |
|
|
|
self.assertEqual('foo', str(ctx.exception)) |
|
|
|
|
|
|
|
def test_plan_with_unsupported_type(self): |
|
|
|
zone = Zone('unit.tests.', []) |
|
|
|
|
|
|
|
@ -229,6 +251,25 @@ class TestBaseProvider(TestCase): |
|
|
|
self.assertEqual(zone.name, another.existing.name) |
|
|
|
self.assertEqual(1, len(another.existing.records)) |
|
|
|
|
|
|
|
def test_plan_with_root_ns(self): |
|
|
|
zone = Zone('unit.tests.', []) |
|
|
|
record = Record.new(zone, '', { |
|
|
|
'ttl': 30, |
|
|
|
'type': 'NS', |
|
|
|
'value': '1.2.3.4.', |
|
|
|
}) |
|
|
|
zone.add_record(record) |
|
|
|
|
|
|
|
# No root NS support, no change, thus no plan |
|
|
|
provider = HelperProvider() |
|
|
|
self.assertEqual(None, provider.plan(zone)) |
|
|
|
|
|
|
|
# set Support root NS records, see the record |
|
|
|
provider.SUPPORTS_ROOT_NS = True |
|
|
|
plan = provider.plan(zone) |
|
|
|
self.assertTrue(plan) |
|
|
|
self.assertEqual(1, len(plan.changes)) |
|
|
|
|
|
|
|
def test_apply(self): |
|
|
|
ignored = Zone('unit.tests.', []) |
|
|
|
|
|
|
|
@ -258,6 +299,37 @@ class TestBaseProvider(TestCase): |
|
|
|
# We filtered out the only change |
|
|
|
self.assertFalse(plan) |
|
|
|
|
|
|
|
def test_plan_order_of_operations(self): |
|
|
|
|
|
|
|
class MockProvider(BaseProvider): |
|
|
|
log = getLogger('mock-provider') |
|
|
|
SUPPORTS = set(('A',)) |
|
|
|
SUPPORTS_GEO = False |
|
|
|
|
|
|
|
def __init__(self): |
|
|
|
super().__init__('mock-provider') |
|
|
|
self.calls = [] |
|
|
|
|
|
|
|
def populate(self, *args, **kwargs): |
|
|
|
self.calls.append('populate') |
|
|
|
|
|
|
|
def _process_desired_zone(self, *args, **kwargs): |
|
|
|
self.calls.append('_process_desired_zone') |
|
|
|
return super()._process_desired_zone(*args, **kwargs) |
|
|
|
|
|
|
|
def _process_existing_zone(self, *args, **kwargs): |
|
|
|
self.calls.append('_process_existing_zone') |
|
|
|
return super()._process_existing_zone(*args, **kwargs) |
|
|
|
|
|
|
|
provider = MockProvider() |
|
|
|
|
|
|
|
zone = Zone('unit.tests.', []) |
|
|
|
self.assertFalse(provider.plan(zone)) |
|
|
|
# ensure the calls were made in the expected order, populate comes |
|
|
|
# first, then desired, then existing |
|
|
|
self.assertEqual(['populate', '_process_desired_zone', |
|
|
|
'_process_existing_zone'], provider.calls) |
|
|
|
|
|
|
|
def test_process_desired_zone(self): |
|
|
|
provider = HelperProvider('test') |
|
|
|
|
|
|
|
@ -278,10 +350,6 @@ class TestBaseProvider(TestCase): |
|
|
|
provider.SUPPORTS_MULTIVALUE_PTR = True |
|
|
|
zone2 = provider._process_desired_zone(zone1.copy()) |
|
|
|
record2 = list(zone2.records)[0] |
|
|
|
from pprint import pprint |
|
|
|
pprint([ |
|
|
|
record1, record2 |
|
|
|
]) |
|
|
|
self.assertEqual(len(record2.values), 2) |
|
|
|
|
|
|
|
# SUPPORTS_DYNAMIC |
|
|
|
@ -329,6 +397,45 @@ class TestBaseProvider(TestCase): |
|
|
|
'obey' |
|
|
|
) |
|
|
|
|
|
|
|
# SUPPORTS_ROOT_NS |
|
|
|
provider.SUPPORTS_ROOT_NS = False |
|
|
|
zone1 = Zone('unit.tests.', []) |
|
|
|
record1 = Record.new(zone1, '', { |
|
|
|
'type': 'NS', |
|
|
|
'ttl': 3600, |
|
|
|
'values': ['foo.com.', 'bar.com.'], |
|
|
|
}) |
|
|
|
zone1.add_record(record1) |
|
|
|
|
|
|
|
zone2 = provider._process_desired_zone(zone1.copy()) |
|
|
|
self.assertEqual(0, len(zone2.records)) |
|
|
|
|
|
|
|
provider.SUPPORTS_ROOT_NS = True |
|
|
|
zone2 = provider._process_desired_zone(zone1.copy()) |
|
|
|
self.assertEqual(1, len(zone2.records)) |
|
|
|
self.assertEqual(record1, list(zone2.records)[0]) |
|
|
|
|
|
|
|
def test_process_existing_zone(self): |
|
|
|
provider = HelperProvider('test') |
|
|
|
|
|
|
|
# SUPPORTS_ROOT_NS |
|
|
|
provider.SUPPORTS_ROOT_NS = False |
|
|
|
zone1 = Zone('unit.tests.', []) |
|
|
|
record1 = Record.new(zone1, '', { |
|
|
|
'type': 'NS', |
|
|
|
'ttl': 3600, |
|
|
|
'values': ['foo.com.', 'bar.com.'], |
|
|
|
}) |
|
|
|
zone1.add_record(record1) |
|
|
|
|
|
|
|
zone2 = provider._process_existing_zone(zone1.copy(), zone1) |
|
|
|
self.assertEqual(0, len(zone2.records)) |
|
|
|
|
|
|
|
provider.SUPPORTS_ROOT_NS = True |
|
|
|
zone2 = provider._process_existing_zone(zone1.copy(), zone1) |
|
|
|
self.assertEqual(1, len(zone2.records)) |
|
|
|
self.assertEqual(record1, list(zone2.records)[0]) |
|
|
|
|
|
|
|
def test_safe_none(self): |
|
|
|
# No changes is safe |
|
|
|
Plan(None, None, [], True).raise_if_unsafe() |
|
|
|
@ -552,3 +659,265 @@ class TestBaseProvider(TestCase): |
|
|
|
strict.supports_warn_or_except('Hello World!', 'Will not see') |
|
|
|
self.assertEqual('minimal: Hello World!', str(ctx.exception)) |
|
|
|
strict.log.warning.assert_not_called() |
|
|
|
|
|
|
|
|
|
|
|
class TestBaseProviderSupportsRootNs(TestCase): |
|
|
|
|
|
|
|
class Provider(BaseProvider): |
|
|
|
log = getLogger('Provider') |
|
|
|
|
|
|
|
SUPPORTS = set(('A', 'NS')) |
|
|
|
SUPPORTS_GEO = False |
|
|
|
SUPPORTS_ROOT_NS = False |
|
|
|
|
|
|
|
strict_supports = False |
|
|
|
|
|
|
|
def __init__(self, existing=None): |
|
|
|
super().__init__('test') |
|
|
|
self.existing = existing |
|
|
|
|
|
|
|
def populate(self, zone, target=False, lenient=False): |
|
|
|
if self.existing: |
|
|
|
for record in self.existing.records: |
|
|
|
zone.add_record(record) |
|
|
|
return True |
|
|
|
return False |
|
|
|
|
|
|
|
zone = Zone('unit.tests.', []) |
|
|
|
a_record = Record.new(zone, 'ptr', { |
|
|
|
'type': 'A', |
|
|
|
'ttl': 3600, |
|
|
|
'values': ['1.2.3.4', '2.3.4.5'], |
|
|
|
}) |
|
|
|
ns_record = Record.new(zone, 'sub', { |
|
|
|
'type': 'NS', |
|
|
|
'ttl': 3600, |
|
|
|
'values': ['ns2.foo.com.', 'ns2.bar.com.'], |
|
|
|
}) |
|
|
|
no_root = zone.copy() |
|
|
|
no_root.add_record(a_record) |
|
|
|
no_root.add_record(ns_record) |
|
|
|
|
|
|
|
root_ns_record = Record.new(zone, '', { |
|
|
|
'type': 'NS', |
|
|
|
'ttl': 3600, |
|
|
|
'values': ['ns1.foo.com.', 'ns1.bar.com.'], |
|
|
|
}) |
|
|
|
has_root = no_root.copy() |
|
|
|
has_root.add_record(root_ns_record) |
|
|
|
|
|
|
|
other_root_ns_record = Record.new(zone, '', { |
|
|
|
'type': 'NS', |
|
|
|
'ttl': 3600, |
|
|
|
'values': ['ns4.foo.com.', 'ns4.bar.com.'], |
|
|
|
}) |
|
|
|
different_root = no_root.copy() |
|
|
|
different_root.add_record(other_root_ns_record) |
|
|
|
|
|
|
|
# False |
|
|
|
|
|
|
|
def test_supports_root_ns_false_matches(self): |
|
|
|
# provider has a matching existing root record |
|
|
|
provider = self.Provider(self.has_root) |
|
|
|
provider.SUPPORTS_ROOT_NS = False |
|
|
|
|
|
|
|
# matching root NS in the desired |
|
|
|
plan = provider.plan(self.has_root) |
|
|
|
|
|
|
|
# no root ns upport on the target provider so doesn't matter, still no |
|
|
|
# changes |
|
|
|
self.assertFalse(plan) |
|
|
|
|
|
|
|
# plan again with strict_supports enabled, we should get an exception |
|
|
|
# b/c we have something configured that can't be managed |
|
|
|
provider.strict_supports = True |
|
|
|
with self.assertRaises(SupportsException) as ctx: |
|
|
|
provider.plan(self.has_root) |
|
|
|
self.assertEqual('test: root NS record not supported for unit.tests.', |
|
|
|
str(ctx.exception)) |
|
|
|
|
|
|
|
def test_supports_root_ns_false_different(self): |
|
|
|
# provider has a non-matching existing record |
|
|
|
provider = self.Provider(self.different_root) |
|
|
|
provider.SUPPORTS_ROOT_NS = False |
|
|
|
|
|
|
|
# different root is in the desired |
|
|
|
plan = provider.plan(self.has_root) |
|
|
|
|
|
|
|
# the mis-match doesn't matter since we can't manage the records |
|
|
|
# anyway, they will have been removed from the desired and existing. |
|
|
|
self.assertFalse(plan) |
|
|
|
|
|
|
|
# plan again with strict_supports enabled, we should get an exception |
|
|
|
# b/c we have something configured that can't be managed (doesn't |
|
|
|
# matter that it's a mis-match) |
|
|
|
provider.strict_supports = True |
|
|
|
with self.assertRaises(SupportsException) as ctx: |
|
|
|
provider.plan(self.has_root) |
|
|
|
self.assertEqual('test: root NS record not supported for unit.tests.', |
|
|
|
str(ctx.exception)) |
|
|
|
|
|
|
|
def test_supports_root_ns_false_missing(self): |
|
|
|
# provider has an existing record |
|
|
|
provider = self.Provider(self.has_root) |
|
|
|
provider.SUPPORTS_ROOT_NS = False |
|
|
|
|
|
|
|
# desired doesn't have a root |
|
|
|
plan = provider.plan(self.no_root) |
|
|
|
|
|
|
|
# the mis-match doesn't matter since we can't manage the records |
|
|
|
# anyway, they will have been removed from the desired and existing. |
|
|
|
self.assertFalse(plan) |
|
|
|
|
|
|
|
# plan again with strict supports enabled, no change since desired |
|
|
|
# isn't asking to manage root |
|
|
|
provider.strict_supports = True |
|
|
|
plan = provider.plan(self.no_root) |
|
|
|
self.assertFalse(plan) |
|
|
|
|
|
|
|
def test_supports_root_ns_false_create_zone(self): |
|
|
|
# provider has no existing records (create) |
|
|
|
provider = self.Provider() |
|
|
|
provider.SUPPORTS_ROOT_NS = False |
|
|
|
|
|
|
|
# case where we have a root NS in the desired |
|
|
|
plan = provider.plan(self.has_root) |
|
|
|
|
|
|
|
# no support for root NS so we only create the other two records |
|
|
|
self.assertTrue(plan) |
|
|
|
self.assertEqual(2, len(plan.changes)) |
|
|
|
|
|
|
|
# plan again with strict supports enabled, we'll get an exception b/c |
|
|
|
# the target provider can't manage something in desired |
|
|
|
provider.strict_supports = True |
|
|
|
with self.assertRaises(SupportsException) as ctx: |
|
|
|
provider.plan(self.has_root) |
|
|
|
self.assertEqual('test: root NS record not supported for unit.tests.', |
|
|
|
str(ctx.exception)) |
|
|
|
|
|
|
|
def test_supports_root_ns_false_create_zone_missing(self): |
|
|
|
# provider has no existing records (create) |
|
|
|
provider = self.Provider() |
|
|
|
provider.SUPPORTS_ROOT_NS = False |
|
|
|
|
|
|
|
# case where we have a root NS in the desired |
|
|
|
plan = provider.plan(self.no_root) |
|
|
|
|
|
|
|
# no support for root NS so we only create the other two records |
|
|
|
self.assertTrue(plan) |
|
|
|
self.assertEqual(2, len(plan.changes)) |
|
|
|
|
|
|
|
# plan again with strict supports enabled, same result since we're not |
|
|
|
# asking for a root NS it's just the 2 other changes |
|
|
|
provider.strict_supports = True |
|
|
|
plan = provider.plan(self.no_root) |
|
|
|
self.assertTrue(plan) |
|
|
|
self.assertEqual(2, len(plan.changes)) |
|
|
|
|
|
|
|
# True |
|
|
|
|
|
|
|
def test_supports_root_ns_true_matches(self): |
|
|
|
# provider has a matching existing root record |
|
|
|
provider = self.Provider(self.has_root) |
|
|
|
provider.SUPPORTS_ROOT_NS = True |
|
|
|
|
|
|
|
# same root NS in the desired |
|
|
|
plan = provider.plan(self.has_root) |
|
|
|
|
|
|
|
# root NS is supported in the target provider, but they match so no |
|
|
|
# change |
|
|
|
self.assertFalse(plan) |
|
|
|
|
|
|
|
# again with strict supports enabled, no difference |
|
|
|
provider.strict_supports = True |
|
|
|
plan = provider.plan(self.has_root) |
|
|
|
self.assertFalse(plan) |
|
|
|
|
|
|
|
def test_supports_root_ns_true_different(self): |
|
|
|
# provider has a non-matching existing record |
|
|
|
provider = self.Provider(self.different_root) |
|
|
|
provider.SUPPORTS_ROOT_NS = True |
|
|
|
|
|
|
|
# non-matching root NS in the desired |
|
|
|
plan = provider.plan(self.has_root) |
|
|
|
|
|
|
|
# root NS mismatch in a target provider that supports it, we'll see the |
|
|
|
# change |
|
|
|
self.assertTrue(plan) |
|
|
|
change = plan.changes[0] |
|
|
|
self.assertEqual(self.other_root_ns_record, change.existing) |
|
|
|
self.assertEqual(self.root_ns_record, change.new) |
|
|
|
|
|
|
|
# again with strict supports enabled, no difference, we see the change |
|
|
|
provider.strict_supports = True |
|
|
|
plan = provider.plan(self.has_root) |
|
|
|
self.assertTrue(plan) |
|
|
|
change = plan.changes[0] |
|
|
|
self.assertEqual(self.other_root_ns_record, change.existing) |
|
|
|
self.assertEqual(self.root_ns_record, change.new) |
|
|
|
|
|
|
|
def test_supports_root_ns_true_missing(self): |
|
|
|
# provider has a matching existing root record |
|
|
|
provider = self.Provider(self.has_root) |
|
|
|
provider.SUPPORTS_ROOT_NS = True |
|
|
|
|
|
|
|
# there's no root record in the desired |
|
|
|
plan = provider.plan(self.no_root) |
|
|
|
|
|
|
|
# the existing root NS in the target is left alone/as is since we |
|
|
|
# aren't configured with one to manage |
|
|
|
self.assertFalse(plan) |
|
|
|
|
|
|
|
# again with strict supports enabled, no difference as non-configured |
|
|
|
# root NS is a special case that we always just warn about. This is |
|
|
|
# because we can't known them before it's created and some people may |
|
|
|
# choose to just leave them unmanaged undefinitely which has been the |
|
|
|
# behavior up until now. |
|
|
|
provider.strict_supports = True |
|
|
|
plan = provider.plan(self.no_root) |
|
|
|
self.assertFalse(plan) |
|
|
|
|
|
|
|
def test_supports_root_ns_true_create_zone(self): |
|
|
|
# provider has no existing records (create) |
|
|
|
provider = self.Provider() |
|
|
|
provider.SUPPORTS_ROOT_NS = True |
|
|
|
|
|
|
|
# case where we have a root NS in the desired |
|
|
|
plan = provider.plan(self.has_root) |
|
|
|
|
|
|
|
# there's no existing root record since we're creating the zone so |
|
|
|
# we'll get a plan that creates everything, including it |
|
|
|
self.assertTrue(plan) |
|
|
|
self.assertEqual(3, len(plan.changes)) |
|
|
|
change = [c for c in plan.changes |
|
|
|
if c.new.name == '' and c.new._type == 'NS'][0] |
|
|
|
self.assertFalse(change.existing) |
|
|
|
self.assertEqual(self.root_ns_record, change.new) |
|
|
|
|
|
|
|
# again with strict supports enabled, no difference, we see all 3 |
|
|
|
# changes |
|
|
|
provider.strict_supports = True |
|
|
|
plan = provider.plan(self.has_root) |
|
|
|
self.assertTrue(plan) |
|
|
|
self.assertEqual(3, len(plan.changes)) |
|
|
|
change = [c for c in plan.changes |
|
|
|
if c.new.name == '' and c.new._type == 'NS'][0] |
|
|
|
self.assertFalse(change.existing) |
|
|
|
self.assertEqual(self.root_ns_record, change.new) |
|
|
|
|
|
|
|
def test_supports_root_ns_true_create_zone_missing(self): |
|
|
|
# provider has no existing records (create) |
|
|
|
provider = self.Provider() |
|
|
|
provider.SUPPORTS_ROOT_NS = True |
|
|
|
|
|
|
|
# we don't have a root NS configured so we'll ignore them and just |
|
|
|
# manage the other records |
|
|
|
plan = provider.plan(self.no_root) |
|
|
|
self.assertEqual(2, len(plan.changes)) |
|
|
|
|
|
|
|
# again with strict supports enabled, we'd normally throw an exception, |
|
|
|
# but since this is a create and we often can't know the root NS values |
|
|
|
# before the zone is created it's special cased and will only warn |
|
|
|
provider.strict_supports = True |
|
|
|
plan = provider.plan(self.no_root) |
|
|
|
self.assertEqual(2, len(plan.changes)) |