|
|
@ -6,6 +6,7 @@ from __future__ import absolute_import, division, print_function, \ |
|
|
unicode_literals |
|
|
unicode_literals |
|
|
|
|
|
|
|
|
from StringIO import StringIO |
|
|
from StringIO import StringIO |
|
|
|
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
from importlib import import_module |
|
|
from importlib import import_module |
|
|
from os import environ |
|
|
from os import environ |
|
|
import logging |
|
|
import logging |
|
|
@ -46,6 +47,10 @@ class Manager(object): |
|
|
with open(config_file, 'r') as fh: |
|
|
with open(config_file, 'r') as fh: |
|
|
self.config = safe_load(fh, enforce_order=False) |
|
|
self.config = safe_load(fh, enforce_order=False) |
|
|
|
|
|
|
|
|
|
|
|
manager_config = self.config.get('manager', {}) |
|
|
|
|
|
max_workers = manager_config.get('max_workers', 4) |
|
|
|
|
|
self._executor = ThreadPoolExecutor(max_workers=max_workers) |
|
|
|
|
|
|
|
|
self.log.debug('__init__: configuring providers') |
|
|
self.log.debug('__init__: configuring providers') |
|
|
self.providers = {} |
|
|
self.providers = {} |
|
|
for provider_name, provider_config in self.config['providers'].items(): |
|
|
for provider_name, provider_config in self.config['providers'].items(): |
|
|
@ -135,6 +140,24 @@ class Manager(object): |
|
|
self.log.debug('configured_sub_zones: subs=%s', sub_zone_names) |
|
|
self.log.debug('configured_sub_zones: subs=%s', sub_zone_names) |
|
|
return set(sub_zone_names) |
|
|
return set(sub_zone_names) |
|
|
|
|
|
|
|
|
|
|
|
def _populate_and_plan(self, zone_name, sources, targets): |
|
|
|
|
|
|
|
|
|
|
|
self.log.debug('sync: populating, zone=%s', zone_name) |
|
|
|
|
|
zone = Zone(zone_name, |
|
|
|
|
|
sub_zones=self.configured_sub_zones(zone_name)) |
|
|
|
|
|
for source in sources: |
|
|
|
|
|
source.populate(zone) |
|
|
|
|
|
|
|
|
|
|
|
self.log.debug('sync: planning, zone=%s', zone_name) |
|
|
|
|
|
plans = [] |
|
|
|
|
|
|
|
|
|
|
|
for target in targets: |
|
|
|
|
|
plan = target.plan(zone) |
|
|
|
|
|
if plan: |
|
|
|
|
|
plans.append((target, plan)) |
|
|
|
|
|
|
|
|
|
|
|
return plans |
|
|
|
|
|
|
|
|
def sync(self, eligible_zones=[], eligible_targets=[], dry_run=True, |
|
|
def sync(self, eligible_zones=[], eligible_targets=[], dry_run=True, |
|
|
force=False): |
|
|
force=False): |
|
|
self.log.info('sync: eligible_zones=%s, eligible_targets=%s, ' |
|
|
self.log.info('sync: eligible_zones=%s, eligible_targets=%s, ' |
|
|
@ -145,7 +168,7 @@ class Manager(object): |
|
|
if eligible_zones: |
|
|
if eligible_zones: |
|
|
zones = filter(lambda d: d[0] in eligible_zones, zones) |
|
|
zones = filter(lambda d: d[0] in eligible_zones, zones) |
|
|
|
|
|
|
|
|
plans = [] |
|
|
|
|
|
|
|
|
futures = [] |
|
|
for zone_name, config in zones: |
|
|
for zone_name, config in zones: |
|
|
self.log.info('sync: zone=%s', zone_name) |
|
|
self.log.info('sync: zone=%s', zone_name) |
|
|
try: |
|
|
try: |
|
|
@ -181,17 +204,12 @@ class Manager(object): |
|
|
raise Exception('Zone {}, unknown target: {}'.format(zone_name, |
|
|
raise Exception('Zone {}, unknown target: {}'.format(zone_name, |
|
|
target)) |
|
|
target)) |
|
|
|
|
|
|
|
|
self.log.debug('sync: populating') |
|
|
|
|
|
zone = Zone(zone_name, |
|
|
|
|
|
sub_zones=self.configured_sub_zones(zone_name)) |
|
|
|
|
|
for source in sources: |
|
|
|
|
|
source.populate(zone) |
|
|
|
|
|
|
|
|
futures.append(self._executor.submit(self._populate_and_plan, |
|
|
|
|
|
zone_name, sources, targets)) |
|
|
|
|
|
|
|
|
self.log.debug('sync: planning') |
|
|
|
|
|
for target in targets: |
|
|
|
|
|
plan = target.plan(zone) |
|
|
|
|
|
if plan: |
|
|
|
|
|
plans.append((target, plan)) |
|
|
|
|
|
|
|
|
# Wait on all results and unpack/flatten them in to a list of target & |
|
|
|
|
|
# plan pairs. |
|
|
|
|
|
plans = [p for f in futures for p in f.result()] |
|
|
|
|
|
|
|
|
hr = '*************************************************************' \ |
|
|
hr = '*************************************************************' \ |
|
|
'*******************\n' |
|
|
'*******************\n' |
|
|
|