# # # from os import environ, listdir, remove from os.path import dirname, isfile, join from unittest import TestCase from unittest.mock import MagicMock, patch from helpers import ( DummySecrets, DynamicProvider, GeoProvider, NoSshFpProvider, PlannableProvider, SimpleProvider, TemporaryDirectory, ) from octodns import __version__ from octodns.context import ContextDict from octodns.idna import IdnaDict, idna_encode from octodns.manager import ( MainThreadExecutor, Manager, ManagerException, _AggregateTarget, ) from octodns.processor.base import BaseProcessor from octodns.provider.yaml import YamlProvider from octodns.record import Create, Delete, Record, Update from octodns.secret.environ import EnvironSecretsException from octodns.yaml import safe_load from octodns.zone import Zone config_dir = join(dirname(__file__), 'config') def get_config_filename(which): return join(config_dir, which) def reset(directory): for filename in listdir(directory): if filename.endswith('.yaml'): filename = join(directory, filename) remove(filename) class TestManager(TestCase): def test_missing_provider_class(self): with self.assertRaises(ManagerException) as ctx: Manager(get_config_filename('missing-provider-class.yaml')).sync() self.assertTrue('missing class' in str(ctx.exception)) def test_bad_provider_class(self): with self.assertRaises(ManagerException) as ctx: Manager(get_config_filename('bad-provider-class.yaml')).sync() self.assertTrue('Unknown provider class' in str(ctx.exception)) def test_bad_provider_class_module(self): with self.assertRaises(ManagerException) as ctx: Manager( get_config_filename('bad-provider-class-module.yaml') ).sync() self.assertTrue('Unknown provider class' in str(ctx.exception)) def test_bad_provider_class_no_module(self): with self.assertRaises(ManagerException) as ctx: Manager( get_config_filename('bad-provider-class-no-module.yaml') ).sync() self.assertTrue('Unknown provider class' in str(ctx.exception)) def test_missing_provider_config(self): # Missing provider config with self.assertRaises(ManagerException) as ctx: Manager(get_config_filename('missing-provider-config.yaml')).sync() self.assertTrue('provider config' in str(ctx.exception)) def test_missing_env_config(self): # details of the EnvironSecrets will be tested in dedicated tests with self.assertRaises(EnvironSecretsException) as ctx: Manager(get_config_filename('missing-provider-env.yaml')).sync() self.assertTrue('missing env var' in str(ctx.exception)) def test_missing_source(self): with self.assertRaises(ManagerException) as ctx: Manager(get_config_filename('provider-problems.yaml')).sync( ['missing.sources.'] ) self.assertTrue('missing sources' in str(ctx.exception)) def test_missing_zone(self): with self.assertRaises(ManagerException) as ctx: Manager(get_config_filename('dynamic-config.yaml')).sync( ['missing.zones.'] ) self.assertTrue('Requested zone ' in str(ctx.exception)) def test_missing_targets(self): with self.assertRaises(ManagerException) as ctx: Manager(get_config_filename('provider-problems.yaml')).sync( ['missing.targets.'] ) self.assertTrue('missing targets' in str(ctx.exception)) def test_unknown_source(self): with self.assertRaises(ManagerException) as ctx: Manager(get_config_filename('provider-problems.yaml')).sync( ['unknown.source.'] ) self.assertTrue('unknown source' in str(ctx.exception)) def test_unknown_target(self): with self.assertRaises(ManagerException) as ctx: Manager(get_config_filename('provider-problems.yaml')).sync( ['unknown.target.'] ) self.assertTrue('unknown target' in str(ctx.exception)) def test_bad_plan_output_class(self): with self.assertRaises(ManagerException) as ctx: name = 'bad-plan-output-missing-class.yaml' Manager(get_config_filename(name)).sync() self.assertTrue( 'plan_output bad is missing class' in str(ctx.exception) ) def test_bad_plan_output_config(self): with self.assertRaises(ManagerException) as ctx: Manager(get_config_filename('bad-plan-output-config.yaml')).sync() self.assertTrue( 'Incorrect plan_output config for bad' in str(ctx.exception) ) def test_source_only_as_a_target(self): with self.assertRaises(ManagerException) as ctx: Manager(get_config_filename('provider-problems.yaml')).sync( ['not.targetable.'] ) self.assertTrue('does not support targeting' in str(ctx.exception)) def test_always_dry_run(self): with TemporaryDirectory() as tmpdir: environ['YAML_TMP_DIR'] = tmpdir.dirname environ['YAML_TMP_DIR2'] = tmpdir.dirname tc = Manager(get_config_filename('always-dry-run.yaml')).sync( dry_run=False ) # only the stuff from subzone, unit.tests. is always-dry-run self.assertEqual(3, tc) def test_simple(self): with TemporaryDirectory() as tmpdir: environ['YAML_TMP_DIR'] = tmpdir.dirname environ['YAML_TMP_DIR2'] = tmpdir.dirname tc = Manager(get_config_filename('simple.yaml')).sync(dry_run=False) self.assertEqual(28, tc) # try with just one of the zones reset(tmpdir.dirname) tc = Manager(get_config_filename('simple.yaml')).sync( dry_run=False, eligible_zones=['unit.tests.'] ) self.assertEqual(22, tc) # the subzone, with 2 targets reset(tmpdir.dirname) tc = Manager(get_config_filename('simple.yaml')).sync( dry_run=False, eligible_zones=['subzone.unit.tests.'] ) self.assertEqual(6, tc) # and finally the empty zone reset(tmpdir.dirname) tc = Manager(get_config_filename('simple.yaml')).sync( dry_run=False, eligible_zones=['empty.'] ) self.assertEqual(0, tc) # Again with force reset(tmpdir.dirname) tc = Manager(get_config_filename('simple.yaml')).sync( dry_run=False, force=True ) self.assertEqual(28, tc) # Again with max_workers = 1 reset(tmpdir.dirname) tc = Manager( get_config_filename('simple.yaml'), max_workers=1 ).sync(dry_run=False, force=True) self.assertEqual(28, tc) # Include meta reset(tmpdir.dirname) tc = Manager( get_config_filename('simple.yaml'), max_workers=1, include_meta=True, ).sync(dry_run=False, force=True) self.assertEqual(33, tc) def test_enable_checksum(self): with TemporaryDirectory() as tmpdir: environ['YAML_TMP_DIR'] = tmpdir.dirname environ['YAML_TMP_DIR2'] = tmpdir.dirname manager = Manager( get_config_filename('simple.yaml'), enable_checksum=True ) # initial/dry run is fine w/o checksum tc = manager.sync(dry_run=True) self.assertEqual(0, tc) # trying to apply it fails w/o required checksum with self.assertRaises(ManagerException) as ctx: manager.sync(dry_run=False) msg, checksum = str(ctx.exception).rsplit('=', 1) self.assertEqual('checksum=None does not match computed', msg) self.assertTrue(checksum) # wrong checksum fails with self.assertRaises(ManagerException) as ctx: manager.sync(checksum='xyz') msg, checksum = str(ctx.exception).rsplit('=', 1) self.assertEqual('checksum=xyz does not match computed', msg) self.assertTrue(checksum) # correct checksum applies (w/o dry_run=False) tc = manager.sync(checksum=checksum) self.assertEqual(28, tc) def test_idna_eligible_zones(self): # loading w/simple, but we'll be blowing it away and doing some manual # stuff manager = Manager(get_config_filename('simple.yaml')) # these configs won't be valid, but that's fine we can test what we're # after based on exceptions raised manager.config['zones'] = manager._config_zones( {'déjà.vu.': {}, 'deja.vu.': {}, idna_encode('こんにちは.jp.'): {}} ) # refer to them with utf-8 with self.assertRaises(ManagerException) as ctx: manager.sync(eligible_zones=('déjà.vu.',)) self.assertEqual('Zone déjà.vu. is missing sources', str(ctx.exception)) with self.assertRaises(ManagerException) as ctx: manager.sync(eligible_zones=('deja.vu.',)) self.assertEqual('Zone deja.vu. is missing sources', str(ctx.exception)) with self.assertRaises(ManagerException) as ctx: manager.sync(eligible_zones=('こんにちは.jp.',)) self.assertEqual( 'Zone こんにちは.jp. is missing sources', str(ctx.exception) ) # refer to them with idna (exceptions are still utf-8 with self.assertRaises(ManagerException) as ctx: manager.sync(eligible_zones=(idna_encode('déjà.vu.'),)) self.assertEqual('Zone déjà.vu. is missing sources', str(ctx.exception)) with self.assertRaises(ManagerException) as ctx: manager.sync(eligible_zones=(idna_encode('deja.vu.'),)) self.assertEqual('Zone deja.vu. is missing sources', str(ctx.exception)) with self.assertRaises(ManagerException) as ctx: manager.sync(eligible_zones=(idna_encode('こんにちは.jp.'),)) self.assertEqual( 'Zone こんにちは.jp. is missing sources', str(ctx.exception) ) def test_eligible_sources(self): with TemporaryDirectory() as tmpdir: environ['YAML_TMP_DIR'] = tmpdir.dirname environ['YAML_TMP_DIR2'] = tmpdir.dirname # Only allow a target that doesn't exist tc = Manager(get_config_filename('simple.yaml')).sync( eligible_sources=['foo'] ) self.assertEqual(0, tc) def test_eligible_targets(self): with TemporaryDirectory() as tmpdir: environ['YAML_TMP_DIR'] = tmpdir.dirname environ['YAML_TMP_DIR2'] = tmpdir.dirname # Only allow a target that doesn't exist tc = Manager(get_config_filename('simple.yaml')).sync( eligible_targets=['foo'] ) self.assertEqual(0, tc) def test_aliases(self): with TemporaryDirectory() as tmpdir: environ['YAML_TMP_DIR'] = tmpdir.dirname environ['YAML_TMP_DIR2'] = tmpdir.dirname # Alias zones with a valid target. tc = Manager(get_config_filename('simple-alias-zone.yaml')).sync() self.assertEqual(0, tc) # Alias zone with an invalid target. with self.assertRaises(ManagerException) as ctx: tc = Manager( get_config_filename('unknown-source-zone.yaml') ).sync() self.assertEqual( 'Invalid alias zone alias.tests.: source zone ' 'does-not-exists.tests. does not exist', str(ctx.exception), ) # Alias zone that points to another alias zone. with self.assertRaises(ManagerException) as ctx: tc = Manager(get_config_filename('alias-zone-loop.yaml')).sync() self.assertEqual( 'Invalid alias zone alias-loop.tests.: source ' 'zone alias.tests. is an alias zone', str(ctx.exception), ) # Sync an alias without the zone it refers to with self.assertRaises(ManagerException) as ctx: tc = Manager( get_config_filename('simple-alias-zone.yaml') ).sync(eligible_zones=["alias.tests."]) self.assertEqual( 'Zone alias.tests. cannot be synced without zone ' 'unit.tests. sinced it is aliased', str(ctx.exception), ) def test_compare(self): with TemporaryDirectory() as tmpdir: environ['YAML_TMP_DIR'] = tmpdir.dirname environ['YAML_TMP_DIR2'] = tmpdir.dirname manager = Manager(get_config_filename('simple.yaml')) # make sure this was pulled in from the config self.assertEqual(2, manager._executor._max_workers) changes = manager.compare(['in'], ['in'], 'unit.tests.') self.assertEqual([], changes) # Create an empty unit.test zone config with open(join(tmpdir.dirname, 'unit.tests.yaml'), 'w') as fh: fh.write('---\n{}') # compare doesn't use _process_desired_zone and thus doesn't filter # out root NS records, that seems fine/desirable changes = manager.compare(['in'], ['dump'], 'unit.tests.') self.assertEqual(23, len(changes)) # Compound sources with varying support changes = manager.compare( ['in', 'nosshfp'], ['dump'], 'unit.tests.' ) self.assertEqual(22, len(changes)) with self.assertRaises(ManagerException) as ctx: manager.compare(['nope'], ['dump'], 'unit.tests.') self.assertEqual('Unknown source: nope', str(ctx.exception)) def test_aggregate_target(self): simple = SimpleProvider() geo = GeoProvider() dynamic = DynamicProvider() nosshfp = NoSshFpProvider() targets = [simple, geo] at = _AggregateTarget(targets) # expected targets self.assertEqual(targets, at.targets) # union of their SUPPORTS self.assertEqual(set(('A')), at.SUPPORTS) # unknown property will go up into super and throw the normal # exception with self.assertRaises(AttributeError) as ctx: at.FOO self.assertEqual( '_AggregateTarget object has no attribute FOO', str(ctx.exception) ) self.assertFalse(_AggregateTarget([simple, simple]).SUPPORTS_GEO) self.assertFalse(_AggregateTarget([simple, geo]).SUPPORTS_GEO) self.assertFalse(_AggregateTarget([geo, simple]).SUPPORTS_GEO) self.assertTrue(_AggregateTarget([geo, geo]).SUPPORTS_GEO) self.assertFalse(_AggregateTarget([simple, simple]).SUPPORTS_DYNAMIC) self.assertFalse(_AggregateTarget([simple, dynamic]).SUPPORTS_DYNAMIC) self.assertFalse(_AggregateTarget([dynamic, simple]).SUPPORTS_DYNAMIC) self.assertTrue(_AggregateTarget([dynamic, dynamic]).SUPPORTS_DYNAMIC) zone = Zone('unit.tests.', []) record = Record.new( zone, 'sshfp', { 'ttl': 60, 'type': 'SSHFP', 'value': { 'algorithm': 1, 'fingerprint_type': 1, 'fingerprint': 'abcdefg', }, }, ) self.assertTrue(simple.supports(record)) self.assertFalse(nosshfp.supports(record)) self.assertTrue(_AggregateTarget([simple, simple]).supports(record)) self.assertFalse(_AggregateTarget([simple, nosshfp]).supports(record)) def test_dump(self): with TemporaryDirectory() as tmpdir: environ['YAML_TMP_DIR'] = tmpdir.dirname environ['YAML_TMP_DIR2'] = tmpdir.dirname manager = Manager(get_config_filename('simple.yaml')) with self.assertRaises(ManagerException) as ctx: manager.dump( zone='unit.tests.', output_dir=tmpdir.dirname, split=True, sources=['nope'], ) self.assertEqual('Unknown source: nope', str(ctx.exception)) # specific zone manager.dump( zone='unit.tests.', output_dir=tmpdir.dirname, split=True, sources=['in'], ) self.assertEqual(['unit.tests.'], listdir(tmpdir.dirname)) # all configured zones manager.dump( zone='*', output_dir=tmpdir.dirname, split=True, sources=['in'] ) self.assertEqual( [ 'empty.', 'sub.txt.unit.tests.', 'subzone.unit.tests.', 'unit.tests.', ], sorted(listdir(tmpdir.dirname)), ) # make sure this fails with an ManagerException and not a KeyError # when trying to find sub zones with self.assertRaises(ManagerException): manager.dump( zone='unknown.zone.', output_dir=tmpdir.dirname, split=True, sources=['in'], ) def test_dump_empty(self): with TemporaryDirectory() as tmpdir: environ['YAML_TMP_DIR'] = tmpdir.dirname environ['YAML_TMP_DIR2'] = tmpdir.dirname manager = Manager(get_config_filename('simple.yaml')) manager.dump( zone='empty.', output_dir=tmpdir.dirname, sources=['in'] ) with open(join(tmpdir.dirname, 'empty.yaml')) as fh: data = safe_load(fh, False) self.assertFalse(data) def test_dump_output_provider(self): with TemporaryDirectory() as tmpdir: environ['YAML_TMP_DIR'] = tmpdir.dirname # this time we'll use seperate tmp dirs with TemporaryDirectory() as tmpdir2: environ['YAML_TMP_DIR2'] = tmpdir2.dirname manager = Manager(get_config_filename('simple.yaml')) # we're going to tell it to use dump2 to do the dumping, but a # copy should be made and directory set to tmpdir.dirname # rather than 2's tmpdir2.dirname manager.dump( zone='unit.tests.', output_dir=tmpdir.dirname, output_provider='dump2', sources=['in'], ) self.assertTrue(isfile(join(tmpdir.dirname, 'unit.tests.yaml'))) self.assertFalse( isfile(join(tmpdir2.dirname, 'unit.tests.yaml')) ) # let's run that again, this time telling it to use tmpdir2 and # dump2 which should allow it to skip the copying manager.dump( zone='unit.tests.', output_dir=tmpdir2.dirname, output_provider='dump2', sources=['in'], ) self.assertTrue( isfile(join(tmpdir2.dirname, 'unit.tests.yaml')) ) # tell it to use an output_provider that doesn't exist with self.assertRaises(ManagerException) as ctx: manager.dump( zone='unit.tests.', output_dir=tmpdir.dirname, output_provider='nope', sources=['in'], ) self.assertEqual( 'Unknown output_provider: nope', str(ctx.exception) ) # tell it to use an output_provider that doesn't support # directory with self.assertRaises(ManagerException) as ctx: manager.dump( zone='unit.tests.', output_dir=tmpdir.dirname, output_provider='simple', sources=['in'], ) self.assertEqual( 'output_provider=simple, does not support ' 'directory property', str(ctx.exception), ) # hack a directory property onto the simple provider so that # it'll pass that check and fail the copy one instead manager.providers['simple'].directory = 42 with self.assertRaises(ManagerException) as ctx: manager.dump( zone='unit.tests.', output_dir=tmpdir.dirname, output_provider='simple', sources=['in'], ) self.assertEqual( 'output_provider=simple, does not support copy method', str(ctx.exception), ) def test_dump_split(self): with TemporaryDirectory() as tmpdir: environ['YAML_TMP_DIR'] = tmpdir.dirname environ['YAML_TMP_DIR2'] = tmpdir.dirname manager = Manager(get_config_filename('simple-split.yaml')) with self.assertRaises(ManagerException) as ctx: manager.dump( zone='unit.tests.', output_dir=tmpdir.dirname, split=True, sources=['nope'], ) self.assertEqual('Unknown source: nope', str(ctx.exception)) manager.dump( zone='unit.tests.', output_dir=tmpdir.dirname, split=True, sources=['in'], ) # make sure this fails with an ManagerException and not a KeyError # when trying to find sub zones with self.assertRaises(ManagerException): manager.dump( zone='unknown.zone.', output_dir=tmpdir.dirname, split=True, sources=['in'], ) def test_dump_processors(self): with TemporaryDirectory() as tmpdir: environ['YAML_TMP_DIR'] = tmpdir.dirname manager = Manager(get_config_filename('dump-processors.yaml')) # Dump with processor that filters to only A records manager.dump( zone='unit.tests.', output_dir=tmpdir.dirname, sources=['config'], ) # Read the dumped file and verify only A records are present dumped = YamlProvider('dumped', tmpdir.dirname) zone = Zone('unit.tests.', []) dumped.populate(zone) # Should only have A records, not AAAA, CNAME, etc. record_types = {r._type for r in zone.records} self.assertIn('A', record_types) self.assertNotIn('AAAA', record_types) self.assertNotIn('CNAME', record_types) # Test unknown processor error with self.assertRaises(ManagerException) as ctx: manager.dump( zone='bad.unit.tests.', output_dir=tmpdir.dirname, sources=['config'], ) self.assertIn('unknown processor', str(ctx.exception)) def test_validate_configs(self): Manager(get_config_filename('simple-validate.yaml')).validate_configs() with self.assertRaises(ManagerException) as ctx: Manager( get_config_filename('missing-sources.yaml') ).validate_configs() self.assertTrue('missing sources' in str(ctx.exception)) with self.assertRaises(ManagerException) as ctx: Manager( get_config_filename('unknown-provider.yaml') ).validate_configs() self.assertTrue('unknown source' in str(ctx.exception)) # Alias zone using an invalid source zone. with self.assertRaises(ManagerException) as ctx: Manager( get_config_filename('unknown-source-zone.yaml') ).validate_configs() self.assertTrue('does not exist' in str(ctx.exception)) # Alias zone that points to another alias zone. with self.assertRaises(ManagerException) as ctx: Manager( get_config_filename('alias-zone-loop.yaml') ).validate_configs() self.assertTrue('is an alias zone' in str(ctx.exception)) # Valid config file using an alias zone. Manager( get_config_filename('simple-alias-zone.yaml') ).validate_configs() with self.assertRaises(ManagerException) as ctx: Manager( get_config_filename('unknown-processor.yaml') ).validate_configs() self.assertTrue('unknown processor' in str(ctx.exception)) def test_get_zone(self): Manager(get_config_filename('simple.yaml')).get_zone('unit.tests.') with self.assertRaises(ManagerException) as ctx: Manager(get_config_filename('simple.yaml')).get_zone('unit.tests') self.assertTrue('missing ending dot' in str(ctx.exception)) with self.assertRaises(ManagerException) as ctx: Manager(get_config_filename('simple.yaml')).get_zone( 'unknown-zone.tests.' ) self.assertTrue('Unknown zone name' in str(ctx.exception)) def test_populate_lenient_fallback(self): with TemporaryDirectory() as tmpdir: environ['YAML_TMP_DIR'] = tmpdir.dirname environ['YAML_TMP_DIR2'] = tmpdir.dirname # Only allow a target that doesn't exist manager = Manager(get_config_filename('simple.yaml')) class NoLenient(SimpleProvider): def populate(self, zone): pass # This should be ok, we'll fall back to not passing it manager._populate_and_plan('unit.tests.', [], [NoLenient()], []) class OtherType(SimpleProvider): def populate(self, zone, lenient=False): raise TypeError('something else') # This will blow up, we don't fallback for source with self.assertRaises(TypeError) as ctx: manager._populate_and_plan('unit.tests.', [], [OtherType()], []) self.assertEqual('something else', str(ctx.exception)) def test_plan_processors_fallback(self): with TemporaryDirectory() as tmpdir: environ['YAML_TMP_DIR'] = tmpdir.dirname environ['YAML_TMP_DIR2'] = tmpdir.dirname # Only allow a target that doesn't exist manager = Manager(get_config_filename('simple.yaml')) class NoProcessors(SimpleProvider): def plan(self, zone): pass # This should be ok, we'll fall back to not passing it manager._populate_and_plan('unit.tests.', [], [], [NoProcessors()]) class OtherType(SimpleProvider): def plan(self, zone, processors): raise TypeError('something else') # This will blow up, we don't fallback for source with self.assertRaises(TypeError) as ctx: manager._populate_and_plan('unit.tests.', [], [], [OtherType()]) self.assertEqual('something else', str(ctx.exception)) @patch('octodns.manager.Manager._get_named_class') def test_sync_passes_file_handle(self, mock): plan_output_mock = MagicMock() plan_output_class_mock = MagicMock() plan_output_class_mock.return_value = plan_output_mock mock.return_value = (plan_output_class_mock, 'ignored', 'ignored') fh_mock = MagicMock() Manager(get_config_filename('plan-output-filehandle.yaml')).sync( plan_output_fh=fh_mock ) # Since we only care about the fh kwarg, and different _PlanOutputs are # are free to require arbitrary kwargs anyway, we concern ourselves # with checking the value of fh only. plan_output_mock.run.assert_called() _, kwargs = plan_output_mock.run.call_args self.assertEqual(fh_mock, kwargs.get('fh')) def test_processor_config(self): # Smoke test loading a valid config manager = Manager(get_config_filename('processors.yaml')) self.assertEqual( ['noop', 'test', 'global-counter'], list(manager.processors.keys()) ) # make sure we got the global processor and that it's count is 0 now self.assertEqual(['global-counter'], manager.global_processors) self.assertEqual(0, manager.processors['global-counter'].count) # This zone specifies a valid processor manager.sync(['unit.tests.']) # make sure the global processor ran and counted some records self.assertTrue(manager.processors['global-counter'].count >= 25) with self.assertRaises(ManagerException) as ctx: # This zone specifies a non-existent processor manager.sync(['bad.unit.tests.']) self.assertTrue( 'Zone bad.unit.tests., unknown processor: ' 'doesnt-exist' in str(ctx.exception) ) with self.assertRaises(ManagerException) as ctx: Manager(get_config_filename('processors-missing-class.yaml')) self.assertTrue( 'Processor no-class is missing class' in str(ctx.exception) ) with self.assertRaises(ManagerException) as ctx: Manager(get_config_filename('processors-wants-config.yaml')) self.assertTrue( 'Incorrect processor config for wants-config' in str(ctx.exception) ) def test_processors(self): manager = Manager(get_config_filename('simple.yaml')) targets = [PlannableProvider('prov')] zone = Zone('unit.tests.', []) record = Record.new( zone, 'a', {'ttl': 30, 'type': 'A', 'value': '1.2.3.4'} ) # muck with sources class MockProcessor(BaseProcessor): def process_source_zone(self, zone, sources): zone = zone.copy() zone.add_record(record) return zone mock = MockProcessor('mock') plans, zone = manager._populate_and_plan( 'unit.tests.', [mock], [], targets ) # Our mock was called and added the record self.assertEqual(record, list(zone.records)[0]) # We got a create for the thing added to the expected state (source) self.assertIsInstance(plans[0][1].changes[0], Create) # muck with targets class MockProcessor(BaseProcessor): def process_target_zone(self, zone, target): zone = zone.copy() zone.add_record(record) return zone mock = MockProcessor('mock') plans, zone = manager._populate_and_plan( 'unit.tests.', [mock], [], targets ) # No record added since it's target this time self.assertFalse(zone.records) # We got a delete for the thing added to the existing state (target) self.assertIsInstance(plans[0][1].changes[0], Delete) # source & target record2 = Record.new( zone, 'a2', {'ttl': 31, 'type': 'A', 'value': '1.2.3.4'} ) record3 = Record.new( zone, 'a3', {'ttl': 32, 'type': 'A', 'value': '1.2.3.4'} ) class MockProcessor(BaseProcessor): def process_source_and_target_zones( self, desired, existing, target ): # add something to desired desired.add_record(record2) # add something to existing existing.add_record(record3) # add something to both, but with a modification desired.add_record(record) mod = record.copy() mod.ttl += 1 existing.add_record(mod) return desired, existing mock = MockProcessor('mock') plans, zone = manager._populate_and_plan( 'unit.tests.', [mock], [], targets ) # we should see a plan self.assertTrue(plans) plan = plans[0][1] # it shoudl have a create, an update, and a delete self.assertEqual( 'a', next(c.record.name for c in plan.changes if isinstance(c, Update)), ) self.assertEqual( 'a2', next(c.record.name for c in plan.changes if isinstance(c, Create)), ) self.assertEqual( 'a3', next(c.record.name for c in plan.changes if isinstance(c, Delete)), ) # muck with plans class MockProcessor(BaseProcessor): def process_target_zone(self, zone, target): zone = zone.copy() zone.add_record(record) return zone def process_plan(self, plans, sources, target): # get rid of the change plans.changes.pop(0) mock = MockProcessor('mock') plans, zone = manager._populate_and_plan( 'unit.tests.', [mock], [], targets ) # We planned a delete again, but this time removed it from the plan, so # no plans self.assertFalse(plans) def test_try_version(self): manager = Manager(get_config_filename('simple.yaml')) class DummyModule(object): __version__ = '2.3.4' dummy_module = DummyModule() # use importlib.metadata.version self.assertTrue( __version__, manager._try_version( 'octodns', module=dummy_module, version='1.2.3' ), ) # use module self.assertTrue( manager._try_version('doesnt-exist', module=dummy_module) ) # fall back to version, preferred over module self.assertEqual( '1.2.3', manager._try_version( 'doesnt-exist', module=dummy_module, version='1.2.3' ), ) def test_subzone_handling(self): manager = Manager(get_config_filename('simple.yaml')) # tree with multiple branches, one that skips manager.config['zones'] = { 'unit.tests.': {}, 'sub.unit.tests.': {}, 'another.sub.unit.tests.': {}, 'skipped.alevel.unit.tests.': {}, } self.assertEqual( {'another.sub', 'sub', 'skipped.alevel'}, manager.configured_sub_zones('unit.tests.'), ) self.assertEqual( {'another'}, manager.configured_sub_zones('sub.unit.tests.') ) self.assertEqual( set(), manager.configured_sub_zones('another.sub.unit.tests.') ) self.assertEqual( set(), manager.configured_sub_zones('skipped.alevel.unit.tests.') ) # unknown zone names return empty set self.assertEqual(set(), manager.configured_sub_zones('unknown.tests.')) # two parallel trees, make sure they don't interfere manager.config['zones'] = { 'unit.tests.': {}, 'unit2.tests.': {}, 'sub.unit.tests.': {}, 'sub.unit2.tests.': {}, 'another.sub.unit.tests.': {}, 'another.sub.unit2.tests.': {}, 'skipped.alevel.unit.tests.': {}, 'skipped.alevel.unit2.tests.': {}, } manager._configured_sub_zones = None self.assertEqual( {'another.sub', 'sub', 'skipped.alevel'}, manager.configured_sub_zones('unit.tests.'), ) self.assertEqual( {'another'}, manager.configured_sub_zones('sub.unit.tests.') ) self.assertEqual( set(), manager.configured_sub_zones('another.sub.unit.tests.') ) self.assertEqual( set(), manager.configured_sub_zones('skipped.alevel.unit.tests.') ) self.assertEqual( {'another.sub', 'sub', 'skipped.alevel'}, manager.configured_sub_zones('unit2.tests.'), ) self.assertEqual( {'another'}, manager.configured_sub_zones('sub.unit2.tests.') ) self.assertEqual( set(), manager.configured_sub_zones('another.sub.unit2.tests.') ) self.assertEqual( set(), manager.configured_sub_zones('skipped.alevel.unit2.tests.') ) # zones that end with names of others manager.config['zones'] = { 'unit.tests.': {}, 'uunit.tests.': {}, 'uuunit.tests.': {}, } manager._configured_sub_zones = None self.assertEqual(set(), manager.configured_sub_zones('unit.tests.')) self.assertEqual(set(), manager.configured_sub_zones('uunit.tests.')) self.assertEqual(set(), manager.configured_sub_zones('uuunit.tests.')) # skipping multiple levels manager.config['zones'] = { 'unit.tests.': {}, 'foo.bar.baz.unit.tests.': {}, } manager._configured_sub_zones = None self.assertEqual( {'foo.bar.baz'}, manager.configured_sub_zones('unit.tests.') ) self.assertEqual( set(), manager.configured_sub_zones('foo.bar.baz.unit.tests.') ) # different TLDs manager.config['zones'] = { 'unit.tests.': {}, 'foo.unit.tests.': {}, 'unit.org.': {}, 'bar.unit.org.': {}, } manager._configured_sub_zones = None self.assertEqual({'foo'}, manager.configured_sub_zones('unit.tests.')) self.assertEqual(set(), manager.configured_sub_zones('foo.unit.tests.')) self.assertEqual({'bar'}, manager.configured_sub_zones('unit.org.')) self.assertEqual(set(), manager.configured_sub_zones('bar.unit.org.')) # starting a beyond 2 levels manager.config['zones'] = { 'foo.unit.tests.': {}, 'bar.foo.unit.tests.': {}, 'bleep.bloop.foo.unit.tests.': {}, } manager._configured_sub_zones = None self.assertEqual( {'bar', 'bleep.bloop'}, manager.configured_sub_zones('foo.unit.tests.'), ) self.assertEqual( set(), manager.configured_sub_zones('bar.foo.unit.tests.') ) def test_config_zones(self): manager = Manager(get_config_filename('simple.yaml')) # empty == empty self.assertEqual({}, manager._config_zones({})) # single ascii comes back as-is, but in a IdnaDict zones = manager._config_zones({'unit.tests.': 42}) self.assertEqual({'unit.tests.': 42}, zones) self.assertIsInstance(zones, IdnaDict) # single utf-8 comes back idna encoded self.assertEqual( {idna_encode('Déjà.vu.'): 42}, dict(manager._config_zones({'Déjà.vu.': 42})), ) # ascii and non-matching idna as ok self.assertEqual( {idna_encode('déjà.vu.'): 42, 'deja.vu.': 43}, dict( manager._config_zones( {idna_encode('déjà.vu.'): 42, 'deja.vu.': 43} ) ), ) with self.assertRaises(ManagerException) as ctx: # zone configured with both utf-8 and idna is an error manager._config_zones({'Déjà.vu.': 42, idna_encode('Déjà.vu.'): 43}) self.assertEqual( '"déjà.vu." configured both in utf-8 and idna "xn--dj-kia8a.vu."', str(ctx.exception), ) def test_auto_arpa(self): with TemporaryDirectory() as tmpdir: environ['YAML_TMP_DIR'] = tmpdir.dirname manager = Manager(get_config_filename('simple-arpa.yaml')) # provider config self.assertEqual( True, manager.providers.get("auto-arpa").populate_should_replace ) self.assertEqual(1800, manager.providers.get("auto-arpa").ttl) # processor config self.assertEqual( True, manager.processors.get("auto-arpa").populate_should_replace, ) self.assertEqual(1800, manager.processors.get("auto-arpa").ttl) # we can sync eligible_zones so long as they're not arpa tc = manager.sync(dry_run=False, eligible_zones=['unit.tests.']) self.assertEqual(22, tc) # can't do partial syncs that include arpa zones with self.assertRaises(ManagerException) as ctx: manager.sync( dry_run=False, eligible_zones=['unit.tests.', '3.2.2.in-addr.arpa.'], ) self.assertEqual( 'ARPA zones cannot be synced during partial runs when auto_arpa is enabled', str(ctx.exception), ) # same for eligible_sources reset(tmpdir.dirname) tc = manager.sync( dry_run=False, eligible_zones=['unit.tests.'], eligible_sources=['in'], ) self.assertEqual(22, tc) # can't do partial syncs that include arpa zones with self.assertRaises(ManagerException) as ctx: manager.sync(dry_run=False, eligible_sources=['in']) self.assertEqual( 'eligible_sources is incompatible with auto_arpa', str(ctx.exception), ) # same for eligible_targets reset(tmpdir.dirname) tc = manager.sync( dry_run=False, eligible_zones=['unit.tests.'], eligible_targets=['dump'], ) self.assertEqual(22, tc) # can't do partial syncs that include arpa zones with self.assertRaises(ManagerException) as ctx: manager.sync(dry_run=False, eligible_targets=['dump']) self.assertEqual( 'eligible_targets is incompatible with auto_arpa', str(ctx.exception), ) # full sync with arpa is fine, 2 extra records from it reset(tmpdir.dirname) tc = manager.sync(dry_run=False) self.assertEqual(26, tc) def test_dynamic_config_targeted(self): with TemporaryDirectory() as tmpdir: environ['YAML_TMP_DIR'] = tmpdir.dirname manager = Manager(get_config_filename('dynamic-config.yaml')) # two zones which should have been dynamically configured via # list_zones self.assertEqual( 29, manager.sync( eligible_zones=['unit.tests.', 'dynamic.tests.'], dry_run=False, ), ) # just subzone.unit.tests. which was explicitly configured self.assertEqual( 3, manager.sync( eligible_zones=['subzone.unit.tests.'], dry_run=False ), ) def test_dynamic_config_all(self): with TemporaryDirectory() as tmpdir: environ['YAML_TMP_DIR'] = tmpdir.dirname manager = Manager(get_config_filename('dynamic-config.yaml')) # should sync everything across all zones, total of 32 records self.assertEqual(32, manager.sync(dry_run=False)) def test_dynamic_config_with_arpa(self): with TemporaryDirectory() as tmpdir: environ['YAML_TMP_DIR'] = tmpdir.dirname manager = Manager(get_config_filename('dynamic-arpa.yaml')) # should sync everything across all zones, total of 7 records # 4 normal records and 3 arpa records generated self.assertEqual(4 + 3, manager.sync(dry_run=False)) def test_dynamic_config_with_arpa_no_normal_source(self): with TemporaryDirectory() as tmpdir: environ['YAML_TMP_DIR'] = tmpdir.dirname manager = Manager( get_config_filename('dynamic-arpa-no-normal-source.yaml') ) # should sync everything across all zones, total of 4 records # 4 normal records and 0 arpa records generated since no zones to populate was found self.assertEqual(4, manager.sync(dry_run=False)) def test_dynamic_config_unsupported_zone(self): manager = Manager( get_config_filename('dynamic-config-no-list-zones.yaml') ) with self.assertRaises(ManagerException) as ctx: manager.sync() self.assertTrue('does not support `list_zones`' in str(ctx.exception)) def test_build_kwargs(self): manager = Manager(get_config_filename('simple.yaml')) environ['OCTODNS_TEST_1'] = '42' environ['OCTODNS_TEST_2'] = 'string' environ['OCTODNS_TEST_3'] = '43.44' # empty self.assertEqual({}, manager._build_kwargs({})) # simple, no expansion self.assertEqual( {'key': 'val', 'a': 42, 'x': None}, manager._build_kwargs({'key': 'val', 'a': 42, 'x': None}), ) # top-level expansion self.assertEqual( {'secret': 42, 'another': 'string'}, manager._build_kwargs( { 'secret': 'env/OCTODNS_TEST_1', 'another': 'env/OCTODNS_TEST_2', } ), ) # 2nd-level expansion self.assertEqual( { 'parent': { 'secret': 42, 'another': 'string', 'key': 'value', 'f': 43, } }, manager._build_kwargs( { 'parent': { 'secret': 'env/OCTODNS_TEST_1', 'another': 'env/OCTODNS_TEST_2', 'key': 'value', 'f': 43, } } ), ) # 3rd-level expansion self.assertEqual( { 'parent': { 'child': { 'secret': 42, 'another': 'string', 'key': 'value', 'f': 43, } } }, manager._build_kwargs( { 'parent': { 'child': { 'secret': 'env/OCTODNS_TEST_1', 'another': 'env/OCTODNS_TEST_2', 'key': 'value', 'f': 43, } } } ), ) # types/conversion self.assertEqual( {'int': 42, 'string': 'string', 'float': 43.44}, manager._build_kwargs( { 'int': 'env/OCTODNS_TEST_1', 'string': 'env/OCTODNS_TEST_2', 'float': 'env/OCTODNS_TEST_3', } ), ) def test_config_secret_handlers(self): # config doesn't matter here manager = Manager(get_config_filename('simple.yaml')) # no config self.assertEqual({}, manager._config_secret_handlers({})) # missing class with self.assertRaises(ManagerException) as ctx: cfg = {'secr3t': ContextDict({}, context='xyz')} manager._config_secret_handlers(cfg) self.assertEqual( 'Secret Handler secr3t is missing class, xyz', str(ctx.exception) ) # bad param with self.assertRaises(ManagerException) as ctx: cfg = { 'secr3t': ContextDict( { 'class': 'octodns.secret.environ.EnvironSecrets', 'bad': 'param', }, context='xyz', ) } manager._config_secret_handlers(cfg) self.assertEqual( 'Incorrect secret handler config for secr3t, xyz', str(ctx.exception), ) # valid with a param that gets used/tested cfg = { 'secr3t': ContextDict( {'class': 'helpers.DummySecrets', 'prefix': 'pre-'}, context='xyz', ) } shs = manager._config_secret_handlers(cfg) sh = shs.get('secr3t') self.assertTrue(sh) self.assertEqual('pre-thing', sh.fetch('thing', None)) # test configuring secret handlers environ['FROM_ENV_WILL_WORK'] = 'fetched_from_env/' manager = Manager(get_config_filename('secrets.yaml')) # dummy was configured self.assertTrue('dummy' in manager.secret_handlers) dummy = manager.secret_handlers['dummy'] self.assertIsInstance(dummy, DummySecrets) # and has the prefix value explicitly stated in the yaml self.assertEqual('in_config/hello', dummy.fetch('hello', None)) # requires-env was configured self.assertTrue('requires-env' in manager.secret_handlers) requires_env = manager.secret_handlers['requires-env'] self.assertIsInstance(requires_env, DummySecrets) # and successfully pulled a value from env as its prefix self.assertEqual( 'fetched_from_env/hello', requires_env.fetch('hello', None) ) # requires-dummy was created self.assertTrue('requires-dummy' in manager.secret_handlers) requires_dummy = manager.secret_handlers['requires-dummy'] self.assertIsInstance(requires_dummy, DummySecrets) # but failed to fetch a secret from dummy so we just get the configured # value as it was in the yaml for prefix self.assertEqual( 'dummy/FROM_DUMMY_WONT_WORK:hello', requires_dummy.fetch(':hello', None), ) def test_zone_threshold(self): with TemporaryDirectory() as tmpdir: environ['YAML_TMP_DIR'] = tmpdir.dirname manager = Manager(get_config_filename('zone-threshold.yaml')) zone = manager.get_zone('unit.tests.') self.assertEqual(0.2, zone.update_pcent_threshold) self.assertEqual(0.1, zone.delete_pcent_threshold) # subzone has different threshold subzone = manager.get_zone('subzone.unit.tests.') self.assertEqual(0.02, subzone.update_pcent_threshold) self.assertEqual(0.01, subzone.delete_pcent_threshold) # test default of None to ensure Provider precedence zone_with_defaults = manager.get_zone('defaultthresholds.tests.') self.assertIsNone(zone_with_defaults.update_pcent_threshold) self.assertIsNone(zone_with_defaults.delete_pcent_threshold) def test_preprocess_zones_original(self): # these will be unused environ['YAML_TMP_DIR'] = '/tmp' environ['YAML_TMP_DIR2'] = '/tmp' manager = Manager(get_config_filename('simple.yaml')) # nothing returns nothing mock_source = MagicMock() got = manager._preprocess_zones({}, sources=[mock_source]) self.assertEqual({}, got) mock_source.list_zones.assert_not_called() # non-dynamic returns as-is, no calls to sources mock_source.reset_mock() zones = {'unit.tests.': {}} got = manager._preprocess_zones(zones, sources=[mock_source]) self.assertEqual(zones, got) mock_source.list_zones.assert_not_called() # source that doesn't support list_zones class SimpleSource: id = 'simple-source' # dynamic with a source that doesn't support it mock_source.reset_mock() zones = {'*': {}} with self.assertRaises(ManagerException) as ctx: manager._preprocess_zones(zones, sources=[SimpleSource()]) self.assertEqual( 'dynamic zone=* includes a source, simple-source, that does not support `list_zones`', str(ctx.exception), ) mock_source.list_zones.assert_not_called() # same, but w/a source supports it mock_source.reset_mock() config = {'foo': 42} zones = {'*': config} mock_source.list_zones.return_value = ['one', 'two', 'three'] got = manager._preprocess_zones(zones, sources=[mock_source]) self.assertEqual({'one': config, 'two': config, 'three': config}, got) mock_source.list_zones.assert_called_once() # same, but one of the zones is expliticly configured, so left alone mock_source.reset_mock() config = {'foo': 42} zones = {'*': config, 'two': {'bar': 43}} mock_source.list_zones.return_value = ['one', 'two', 'three'] got = manager._preprocess_zones(zones, sources=[mock_source]) self.assertEqual( {'one': config, 'two': {'bar': 43}, 'three': config}, got ) mock_source.list_zones.assert_called_once() # doesn't matter what the actual name is, just that it starts with a *, mock_source.reset_mock() config = {'foo': 42} zones = {'*SDFLKJSDFL': config, 'two': {'bar': 43}} mock_source.list_zones.return_value = ['one', 'two', 'three'] got = manager._preprocess_zones(zones, sources=[mock_source]) self.assertEqual( {'one': config, 'two': {'bar': 43}, 'three': config}, got ) mock_source.list_zones.assert_called_once() def test_preprocess_zones_multiple_single_source(self): # these will be unused environ['YAML_TMP_DIR'] = '/tmp' environ['YAML_TMP_DIR2'] = '/tmp' manager = Manager(get_config_filename('simple.yaml')) manager._get_sources = MagicMock() mock_source = MagicMock() mock_source.id = 'mm' manager._get_sources = MagicMock() manager._get_sources.return_value = [mock_source] config_a = {'foo': 42} config_b = {'bar': 43} zones = {'*.a.com.': config_a, '*.b.com.': config_b} mock_source.list_zones.side_effect = [ ['one.a.com.', 'two.a.com.', 'one.b.com.', 'two.b.com.'] ] got = manager._preprocess_zones(zones, sources=[]) # each zone will have it's sources looked up self.assertEqual(2, manager._get_sources.call_count) # but there's only one source so it's zones will be cached self.assertEqual(1, mock_source.list_zones.call_count) # everything will have been matched by the first old style wildcard and # thus have its config, nothing will have b's self.assertEqual( { 'one.a.com.': config_a, 'two.a.com.': config_a, 'one.b.com.': config_a, 'two.b.com.': config_a, }, got, ) def test_preprocess_zones_multiple_seperate_sources(self): # these will be unused environ['YAML_TMP_DIR'] = '/tmp' environ['YAML_TMP_DIR2'] = '/tmp' manager = Manager(get_config_filename('simple.yaml')) manager._get_sources = MagicMock() mock_source_a = MagicMock() mock_source_a.id = 'mm_a' mock_source_b = MagicMock() mock_source_b.id = 'mm_b' manager._get_sources = MagicMock() manager._get_sources.side_effect = [[mock_source_a], [mock_source_b]] config_a = {'foo': 42} config_b = {'bar': 43} zones = {'*.a.com.': config_a, '*.b.com.': config_b} mock_source_a.list_zones.side_effect = [['one.a.com.', 'two.a.com.']] mock_source_b.list_zones.side_effect = [['one.b.com.', 'two.b.com.']] got = manager._preprocess_zones(zones, sources=[]) # each zone will have it's sources looked up self.assertEqual(2, manager._get_sources.call_count) # so each mock will be called once self.assertEqual(1, mock_source_a.list_zones.call_count) self.assertEqual(1, mock_source_b.list_zones.call_count) # the souces from each source will be matched with the coresponding config self.assertEqual( { 'one.a.com.': config_a, 'two.a.com.': config_a, 'one.b.com.': config_b, 'two.b.com.': config_b, }, got, ) def test_preprocess_zones_glob(self): # these will be unused environ['YAML_TMP_DIR'] = '/tmp' environ['YAML_TMP_DIR2'] = '/tmp' manager = Manager(get_config_filename('simple.yaml')) manager._get_sources = MagicMock() mock_source = MagicMock() mock_source.id = 'mm' manager._get_sources = MagicMock() manager._get_sources.return_value = [mock_source] # won't match anything config_n = {'foo': 42, 'glob': r'*.nope.com.'} # match things with .a. config_a = {'foo': 42, 'glob': r'*.a.com.'} # match things with .b. config_b = {'bar': 43, 'glob': r'*.b.com.'} # will match anything config_c = {'bar': 43, 'glob': r'*'} zones = { '*.nope.com.': config_n, '*.a.com.': config_a, '*.b.com.': config_b, '*': config_c, } mock_source.list_zones.return_value = [ # matched by a 'one.a.com.', # matched by a 'two.a.com.', # matched by b 'one.b.com.', # matched by b 'two.b.com.', # matched by c, catch all 'ignored.com.', ] got = manager._preprocess_zones(zones, sources=[]) # 4 configs self.assertEqual(4, manager._get_sources.call_count) # 1 shared source self.assertEqual(1, mock_source.list_zones.call_count) self.assertEqual( { 'one.a.com.': config_a, 'two.a.com.': config_a, 'one.b.com.': config_b, 'two.b.com.': config_b, 'ignored.com.': config_c, }, got, ) # if we define the catch all first it'll take everything and leave # nothing for the others zones = { '*': config_c, '*.nope.com.': config_n, '*.a.com.': config_a, '*.b.com.': config_b, } got = manager._preprocess_zones(zones, sources=[]) self.assertEqual( { 'one.a.com.': config_c, 'two.a.com.': config_c, 'one.b.com.': config_c, 'two.b.com.': config_c, 'ignored.com.': config_c, }, got, ) def test_preprocess_zones_regex(self): # these will be unused environ['YAML_TMP_DIR'] = '/tmp' environ['YAML_TMP_DIR2'] = '/tmp' manager = Manager(get_config_filename('simple.yaml')) manager._get_sources = MagicMock() mock_source = MagicMock() mock_source.id = 'mm' manager._get_sources = MagicMock() manager._get_sources.return_value = [mock_source] # match things with .a. config_a = {'foo': 42, 'regex': r'\.a\.'} # match things with .b. config_b = {'bar': 43, 'regex': r'\.b\.'} zones = {'*.a.com.': config_a, '*.b.com.': config_b} mock_source.list_zones.side_effect = [ [ 'one.a.com.', 'two.a.com.', 'one.b.com.', 'two.b.com.', 'ignored.com.', ] ] got = manager._preprocess_zones(zones, sources=[]) self.assertEqual(2, manager._get_sources.call_count) self.assertEqual(1, mock_source.list_zones.call_count) # a will regex match .a.com., b will .b.com., ignored.com. won't match # anything self.assertEqual( { 'one.a.com.': config_a, 'two.a.com.': config_a, 'one.b.com.': config_b, 'two.b.com.': config_b, }, got, ) def test_preprocess_zones_regex_claimed(self): # these will be unused environ['YAML_TMP_DIR'] = '/tmp' environ['YAML_TMP_DIR2'] = '/tmp' manager = Manager(get_config_filename('simple.yaml')) manager._get_sources = MagicMock() mock_source = MagicMock() mock_source.id = 'mm' manager._get_sources = MagicMock() manager._get_sources.return_value = [mock_source] # match things with .a. config_a = {'foo': 42, 'regex': r'\.a\.'} # match everything config_b = {'bar': 43, 'regex': r'.*'} zones = {'*.a.com.': config_a, '*.b.com.': config_b} mock_source.list_zones.side_effect = [ [ # won't match a b/c no . before the a, will match b 'a.com.', # will match a, and be claimed 'one.a.com.', # will match a, and be claimed 'two.a.com.', # will match b 'one.b.com.', # will match b 'two.b.com.', # will match b 'ignored.com.', ] ] got = manager._preprocess_zones(zones, sources=[]) self.assertEqual(2, manager._get_sources.call_count) self.assertEqual(1, mock_source.list_zones.call_count) # a will regex match .a.com., b will .b.com., ignored.com. won't match # anything self.assertEqual( { 'a.com.': config_b, 'one.a.com.': config_a, 'two.a.com.': config_a, 'one.b.com.': config_b, 'two.b.com.': config_b, 'ignored.com.': config_b, }, got, ) class TestMainThreadExecutor(TestCase): def test_success(self): mte = MainThreadExecutor() future = mte.submit(self.success, 42) self.assertEqual(42, future.result()) future = mte.submit(self.success, ret=43) self.assertEqual(43, future.result()) def test_exception(self): mte = MainThreadExecutor() e = Exception('boom') future = mte.submit(self.exception, e) with self.assertRaises(Exception) as ctx: future.result() self.assertEqual(e, ctx.exception) future = mte.submit(self.exception, e=e) with self.assertRaises(Exception) as ctx: future.result() self.assertEqual(e, ctx.exception) def success(self, ret): return ret def exception(self, e): raise e