diff --git a/octodns/manager.py b/octodns/manager.py index 86b7f24..2d96c96 100644 --- a/octodns/manager.py +++ b/octodns/manager.py @@ -6,6 +6,7 @@ from __future__ import absolute_import, division, print_function, \ unicode_literals from StringIO import StringIO +from concurrent.futures import ThreadPoolExecutor from importlib import import_module from os import environ import logging @@ -46,6 +47,10 @@ class Manager(object): with open(config_file, 'r') as fh: self.config = safe_load(fh, enforce_order=False) + manager_config = self.config.get('manager', {}) + max_workers = manager_config.get('max_workers', 4) + self._executor = ThreadPoolExecutor(max_workers=max_workers) + self.log.debug('__init__: configuring providers') self.providers = {} for provider_name, provider_config in self.config['providers'].items(): @@ -135,6 +140,24 @@ class Manager(object): self.log.debug('configured_sub_zones: subs=%s', sub_zone_names) return set(sub_zone_names) + def _populate_and_plan(self, zone_name, sources, targets): + + self.log.debug('sync: populating, zone=%s', zone_name) + zone = Zone(zone_name, + sub_zones=self.configured_sub_zones(zone_name)) + for source in sources: + source.populate(zone) + + self.log.debug('sync: planning, zone=%s', zone_name) + plans = [] + + for target in targets: + plan = target.plan(zone) + if plan: + plans.append((target, plan)) + + return plans + def sync(self, eligible_zones=[], eligible_targets=[], dry_run=True, force=False): self.log.info('sync: eligible_zones=%s, eligible_targets=%s, ' @@ -145,7 +168,7 @@ class Manager(object): if eligible_zones: zones = filter(lambda d: d[0] in eligible_zones, zones) - plans = [] + futures = [] for zone_name, config in zones: self.log.info('sync: zone=%s', zone_name) try: @@ -181,17 +204,12 @@ class Manager(object): raise Exception('Zone {}, unknown target: {}'.format(zone_name, target)) - self.log.debug('sync: populating') - zone = Zone(zone_name, - sub_zones=self.configured_sub_zones(zone_name)) - for source in sources: - source.populate(zone) + futures.append(self._executor.submit(self._populate_and_plan, + zone_name, sources, targets)) - self.log.debug('sync: planning') - for target in targets: - plan = target.plan(zone) - if plan: - plans.append((target, plan)) + # Wait on all results and unpack/flatten them in to a list of target & + # plan pairs. + plans = [p for f in futures for p in f.result()] hr = '*************************************************************' \ '*******************\n' diff --git a/tests/config/simple.yaml b/tests/config/simple.yaml index 604b772..cf970a9 100644 --- a/tests/config/simple.yaml +++ b/tests/config/simple.yaml @@ -1,3 +1,5 @@ +manager: + max_workers: 2 providers: in: class: octodns.provider.yaml.YamlProvider diff --git a/tests/test_octodns_manager.py b/tests/test_octodns_manager.py index 7ee4fbd..a6c4bce 100644 --- a/tests/test_octodns_manager.py +++ b/tests/test_octodns_manager.py @@ -128,6 +128,9 @@ class TestManager(TestCase): environ['YAML_TMP_DIR'] = tmpdir.dirname manager = Manager(get_config_filename('simple.yaml')) + # make sure this was pulled in from the config + self.assertEquals(2, manager._executor._max_workers) + changes = manager.compare(['in'], ['in'], 'unit.tests.') self.assertEquals([], changes)