From d493d297dfcebfe5d5b42b6a34b70ab2e3e39bdd Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Fri, 7 Dec 2018 15:18:00 -0800 Subject: [PATCH] WIP DynProvider dynamic implementation --- octodns/provider/dyn.py | 462 ++++++++++++++++++++++++++++++++----- octodns/record/__init__.py | 6 + 2 files changed, 407 insertions(+), 61 deletions(-) diff --git a/octodns/provider/dyn.py b/octodns/provider/dyn.py index f47f77b..1b9c868 100644 --- a/octodns/provider/dyn.py +++ b/octodns/provider/dyn.py @@ -7,9 +7,9 @@ from __future__ import absolute_import, division, print_function, \ from collections import defaultdict from dyn.tm.errors import DynectGetError -from dyn.tm.services.dsf import DSFARecord, DSFAAAARecord, DSFFailoverChain, \ - DSFMonitor, DSFNode, DSFRecordSet, DSFResponsePool, DSFRuleset, \ - TrafficDirector, get_all_dsf_monitors, get_all_dsf_services, \ +from dyn.tm.services.dsf import DSFARecord, DSFAAAARecord, DSFCNAMERecord, \ + DSFFailoverChain, DSFMonitor, DSFNode, DSFRecordSet, DSFResponsePool, \ + DSFRuleset, TrafficDirector, get_all_dsf_monitors, get_all_dsf_services, \ get_response_pool from dyn.tm.session import DynectSession from dyn.tm.zones import Zone as DynZone @@ -21,6 +21,9 @@ from ..record import Record, Update from .base import BaseProvider +from pprint import pprint + + ############################################################################### # # The following monkey patching is to work around functionality that is lacking @@ -232,6 +235,7 @@ class DynProvider(BaseProvider): 'OC': 16, # Continental Australia/Oceania 'AN': 17, # Continental Antarctica } + REGION_CODES_LOOKUP = {code: geo for geo, code in REGION_CODES.items()} MONITOR_HEADER = 'User-Agent: Dyn Monitor' MONITOR_TIMEOUT = 10 @@ -261,8 +265,7 @@ class DynProvider(BaseProvider): @property def SUPPORTS_DYNAMIC(self): - # TODO: dynamic - return False + return True def _check_dyn_sess(self): # We don't have to worry about locking for the check since the @@ -394,62 +397,179 @@ class DynProvider(BaseProvider): tds = defaultdict(dict) for td in get_all_dsf_services(): try: - fqdn, _type = td.label.split(':', 1) + _, fqdn, _type = td.label.split(':', 2) except ValueError as e: - self.log.warn("Failed to load TrafficDirector '%s': %s", - td.label, e.message) - continue + try: + fqdn, _type = td.label.split(':', 1) + except ValueError as e: + self.log.warn("Unsupported TrafficDirector '%s'", + td.label) + continue tds[fqdn][_type] = td self._traffic_directors = dict(tds) + pprint(self._traffic_directors) return self._traffic_directors + def _populate_geo_traffic_director(self, zone, fqdn, _type, td, lenient): + # critical to call rulesets once, each call loads them :-( + rulesets = td.rulesets + + # We start out with something that will always change show + # change in case this is a busted TD. This will prevent us from + # creating a duplicate td. We'll overwrite this with real data + # provide we have it + geo = {} + data = { + 'geo': geo, + 'type': _type, + 'ttl': td.ttl, + 'values': ['0.0.0.0'] + } + for ruleset in rulesets: + try: + record_set = ruleset.response_pools[0].rs_chains[0] \ + .record_sets[0] + except IndexError: + # problems indicate a malformed ruleset, ignore it + continue + if ruleset.label.startswith('default:'): + data_for = getattr(self, '_data_for_{}'.format(_type)) + data.update(data_for(_type, record_set.records)) + else: + # We've stored the geo in label + try: + code, _ = ruleset.label.split(':', 1) + except ValueError: + continue + values = [r.address for r in record_set.records] + geo[code] = values + + name = zone.hostname_from_fqdn(fqdn) + record = Record.new(zone, name, data, source=self) + zone.add_record(record, lenient=lenient) + + return record + + def _populate_dynamic_traffic_director(self, zone, fqdn, _type, td, lenient): + # critical to call rulesets once, each call loads them :-( + rulesets = td.rulesets + + # We start out with something that will always change show + # change in case this is a busted TD. This will prevent us from + # creating a duplicate td. We'll overwrite this with real data + # provide we have it + pools = {} + rules = [] + values = [] + data = { + 'dynamic': { + 'pool': pools, + 'rules': rules, + }, + 'type': _type, + 'ttl': td.ttl, + 'values': values, + } + for ruleset in rulesets: + try: + record_set = ruleset.response_pools[0].rs_chains[0] \ + .record_sets[0] + except IndexError: + # problems indicate a malformed ruleset, ignore it + self.log.warn('_populate_dynamic_traffic_director: ' + 'malformed ruleset "{}" ignoring', + ruleset.label) + continue + + pprint({ + 'ruleset': ruleset, + 'ruleset.reponse_pools': ruleset.response_pools, + 'ruleset.label': ruleset.label, + 'ruleset.criteria_type': ruleset.criteria_type, + 'ruleset.criterial': ruleset.criteria, + +# 'records': [r.__dict__ for r in record_set.records], + }) + + if ruleset.label.startswith('default:'): + data_for = getattr(self, '_data_for_{}'.format(_type)) + data.update(data_for(_type, record_set.records)) + else: + response_pool = ruleset.response_pools[0] + rule = { + 'pool': response_pool.label, + } + + label = response_pool.label + if label not in pools: + # First time we've seen it get its data + pool = { + 'values': [{ + 'value': r.address, + 'weight': r.weight, + } for r in record_set.records] + } + + try: + pool['fallback'] = ruleset.response_pools[1].label + except IndexError: + pass + + pools[label] = pool + + criteria_type = ruleset.criteria_type + if criteria_type == 'geoip': + # Geo + geo = ruleset.criteria['geoip'] + geos = [] + # TODO: we need to reconstitude geos here :-/ + for code in geo['country']: + geos.append(code) + for code in geo['province']: + geos.append(code) + for code in geo['region']: + geos.append(self.REGION_CODES_LOOKUP[int(code)]) + rule['geos'] = geos + elif criteria_type == 'always': + pass + else: + self.log.warn('_populate_dynamic_traffic_director: ' + 'unsupported criteria_type "{}", ignoring', + criteria_type) + continue + + rules.append(rule) + + pprint(data) + + raise Exception('boom') + + name = zone.hostname_from_fqdn(fqdn) + record = Record.new(zone, name, data, source=self, lenient=lenient) + zone.add_record(record, lenient=lenient) + + return record + + def _populate_traffic_directors(self, zone, lenient): - self.log.debug('_populate_traffic_directors: zone=%s', zone.name) + self.log.debug('_populate_traffic_directors: zone=%s, lenient=%s', + zone.name, lenient) td_records = set() for fqdn, types in self.traffic_directors.items(): # TODO: skip subzones if not fqdn.endswith(zone.name): continue - for _type, td in types.items(): - # critical to call rulesets once, each call loads them :-( - rulesets = td.rulesets - - # We start out with something that will always change show - # change in case this is a busted TD. This will prevent us from - # creating a duplicate td. We'll overwrite this with real data - # provide we have it - geo = {} - data = { - 'geo': geo, - 'type': _type, - 'ttl': td.ttl, - 'values': ['0.0.0.0'] - } - for ruleset in rulesets: - try: - record_set = ruleset.response_pools[0].rs_chains[0] \ - .record_sets[0] - except IndexError: - # problems indicate a malformed ruleset, ignore it - continue - _type = record_set.rdata_class - if ruleset.label.startswith('default:'): - data_for = getattr(self, '_data_for_{}'.format(_type)) - data.update(data_for(_type, record_set.records)) - else: - # We've stored the geo in label - try: - code, _ = ruleset.label.split(':', 1) - except ValueError: - continue - values = [r.address for r in record_set.records] - geo[code] = values - - name = zone.hostname_from_fqdn(fqdn) - record = Record.new(zone, name, data, source=self) - zone.add_record(record, lenient=lenient) + if td.label.startswith('dynamic:'): + record = \ + self._populate_dynamic_traffic_director(zone, fqdn, + _type, td, + lenient) + else: + record = \ + self._populate_geo_traffic_director(zone, fqdn, _type, + td, lenient) td_records.add(record) return td_records @@ -659,30 +779,56 @@ class DynProvider(BaseProvider): self._traffic_director_monitors[label] = monitor return monitor - def _find_or_create_pool(self, td, pools, label, _type, values, - monitor_id=None): + def _find_or_create_pool(self, td, pools, label, _type, values=[], + monitor_id=None, record_extras={}): + + # TODO: move this somewhere better + def weighted_keyer(d): + return d['value'] + + values.sort(key=weighted_keyer) + + print('*** looking for {}'.format(label)) for pool in pools: if pool.label != label: + print(' != {}'.format(pool.label)) + continue + print(' == {}'.format(pool.label)) + try: + records = pool.rs_chains[0].record_sets[0].records + except IndexError: + # No values, can't match continue - records = pool.rs_chains[0].record_sets[0].records - record_values = sorted([r.address for r in records]) + record_values = [{ + 'weight': r.weight, + 'value': r.address, + } for r in records] + record_values.sort(key=weighted_keyer) + pprint(record_values) if record_values == values: + print(' match {} == {}'.format(record_values, values)) # it's a match return pool + print(' not match {} != {}'.format(record_values, values)) + # we need to create the pool _class = { 'A': DSFARecord, - 'AAAA': DSFAAAARecord + 'AAAA': DSFAAAARecord, + 'CNAME': DSFCNAMERecord, }[_type] - records = [_class(v) for v in values] - record_set = DSFRecordSet(_type, label, serve_count=len(records), + records = [_class(v['value'], weight=v.get('weight', 1), + **record_extras) + for v in values] + record_set = DSFRecordSet(_type, label, + serve_count=min(len(records), 2), records=records, dsf_monitor_id=monitor_id) chain = DSFFailoverChain(label, record_sets=[record_set]) pool = DSFResponsePool(label, rs_chains=[chain]) pool.create(td) return pool - def _mod_rulesets(self, td, change): + def _mod_geo_rulesets(self, td, change): new = change.new # Response Pools @@ -732,7 +878,7 @@ class DynProvider(BaseProvider): int(r._ordering) for r in existing_rulesets ] + [-1]) + 1 - self.log.debug('_mod_rulesets: insert_at=%d', insert_at) + self.log.debug('_mod_geo_rulesets: insert_at=%d', insert_at) # add the default label = 'default:{}'.format(uuid4().hex) @@ -811,7 +957,7 @@ class DynProvider(BaseProvider): node = DSFNode(new.zone.name, fqdn) td = TrafficDirector(label, ttl=new.ttl, nodes=[node], publish='Y') self.log.debug('_mod_geo_Create: td=%s', td.service_id) - self._mod_rulesets(td, change) + self._mod_geo_rulesets(td, change) self.traffic_directors[fqdn] = { _type: td } @@ -832,7 +978,7 @@ class DynProvider(BaseProvider): self._mod_geo_Create(dyn_zone, change) self._mod_Delete(dyn_zone, change) return - self._mod_rulesets(td, change) + self._mod_geo_rulesets(td, change) def _mod_geo_Delete(self, dyn_zone, change): existing = change.existing @@ -841,6 +987,195 @@ class DynProvider(BaseProvider): fqdn_tds[_type].delete() del fqdn_tds[_type] + def _mod_dynamic_rulesets(self, td, change): + new = change.new + + # Get existing pools. This should be simple, but it's not b/c the dyn + # api is a POS. We need all response pools so we can GC and check to + # make sure that what we're after doesn't already exist. + # td.all_response_pools just returns thin objects that don't include + # their rs_chains (and children down to actual records.) We could just + # foreach over those turning them into full DSFResponsePool objects + # with get_response_pool, but that'd be N round-trips. We can avoid + # those round trips in cases where the pools are in use in rules where + # they're already full objects. + + # First up populate all the pools we have under rules, the _ prevents a + # td.refresh we don't need :-( seriously? + existing_rulesets = td._rulesets + pools = {} + for ruleset in existing_rulesets: + for pool in ruleset.response_pools: + pools[pool.response_pool_id] = pool + + # Reverse sort the existing_rulesets by _ordering so that we'll remove + # them in that order later, this will ensure that we remove the old + # default before any of the old geo rules preventing it from catching + # everything. + existing_rulesets.sort(key=lambda r: r._ordering, reverse=True) + + # Add in any pools that aren't currently referenced by rules + for pool in td.all_response_pools: + rpid = pool.response_pool_id + if rpid not in pools: + # we want this one, but it's thin, inflate it + pools[rpid] = get_response_pool(rpid, td) + # now that we have full objects for the complete set of existing pools, + # a list will be more useful + pprint({ + 'pools': pools + }) + pools = pools.values() + + # Rulesets + + # We need to make sure and insert the new rules after any existing + # rules so they won't take effect before we've had a chance to add + # response pools to them. I've tried both publish=False (which is + # completely broken in the client) and creating the rulesets with + # response_pool_ids neither of which appear to work from the client + # library. If there are no existing rulesets fallback to 0 + insert_at = max([ + int(r._ordering) + for r in existing_rulesets + ] + [-1]) + 1 + self.log.debug('_mod_dynamic_rulesets: insert_at=%d', insert_at) + + # Add the base record values as the ultimiate/unhealthchecked default + label = 'default:{}'.format(uuid4().hex) + ruleset = DSFRuleset(label, 'always', []) + ruleset.create(td, index=insert_at) + values = [{ + 'value': v, + 'weight': 1, + } for v in new.values] + # For these defaults we need to set them to always be served and to + # ignore any health checking (since they won't have one) + pool = self._find_or_create_pool(td, pools, 'default', new._type, + values, record_extras={ + 'automation': 'manual', + 'eligible': True, + }) + # There's no way in the client lib to create a ruleset with an existing + # pool (ref'd by id) so we have to do this round-a-bout. + active_pools = { + # TODO: disallow default as a pool id + 'default': pool.response_pool_id + } + ruleset.add_response_pool(pool.response_pool_id) + + # Get our monitor + monitor_id = self._traffic_director_monitor(new).dsf_monitor_id + + # Make sure we have all the pools we're going to need + for _id, pool in sorted(new.dynamic.pools.items()): + pprint({ + 'pool': pool, + }) + values = [{ + 'weight': v.get('weight', 1), + 'value': v['value'], + } for v in pool.data['values']] + pool = self._find_or_create_pool(td, pools, _id, new._type, values, + monitor_id) + active_pools[_id] = pool.response_pool_id + + # Run through and configure our rules + for rule_num, rule in enumerate(reversed(new.dynamic.rules)): + criteria = defaultdict(lambda: defaultdict(list)) + criteria_type = 'always' + try: + for geo in rule.data['geos']: + geo = new.geo_parse(geo) + pprint(geo) + criteria_type = 'geoip' + if geo['subdivision_code']: + criteria['geoip']['province'] \ + .append(geo['subdivision_code'].lower()) + elif geo['country_code']: + criteria['geoip']['country'].append(geo['country_code']) + else: + criteria['geoip']['region'] \ + .append(self.REGION_CODES[geo['continent_code']]) + except KeyError: + pass + + pprint(criteria) + + label = '{}:{}'.format(rule_num, uuid4().hex) + ruleset = DSFRuleset(label, criteria_type, [], criteria) + # Something you have to call create others the constructor does it + ruleset.create(td, index=insert_at) + + # Add the primary pool for this rule + rule_pool = rule.data['pool'] + ruleset.add_response_pool(active_pools[rule_pool]) + + # OK, we have the rule and its primary pool setup, now look to see + # if there's a fallback chain that needs to be configured + fallback = new.dynamic.pools[rule_pool].data.get('fallback', None) + while fallback: + # looking at client lib code, index > exists appends + ruleset.add_response_pool(active_pools[fallback], index=999) + fallback = new.dynamic.pools[fallback].data.get('fallback', None) + + # and always add default as the last + ruleset.add_response_pool(active_pools['default'], index=999) + + # we're done with active_pools as a lookup, convert it in to a set of + # the ids in use + active_pools = set(active_pools.values()) + # Clean up unused response_pools + for pool in pools: + if pool.response_pool_id in active_pools: + continue + pool.delete() + + # Clean out the old rulesets + for ruleset in existing_rulesets: + ruleset.delete() + + def _mod_dynamic_Create(self, dyn_zone, change): + new = change.new + fqdn = new.fqdn + _type = new._type + label = 'dynamic:{}:{}'.format(fqdn, _type) + node = DSFNode(new.zone.name, fqdn) + td = TrafficDirector(label, ttl=new.ttl, nodes=[node], publish='Y') + self.log.debug('_mod_dynamic_Create: td=%s', td.service_id) + self._mod_dynamic_rulesets(td, change) + self.traffic_directors[fqdn] = { + _type: td + } + + def _mod_dynamic_Update(self, dyn_zone, change): + new = change.new + if not new.dynamic: + if new.geo: + # New record is a geo record + self._mod_geo_Create(dyn_zone, change) + else: + # New record doesn't have dynamic, we're going from a TD to a + # regular record + self._mod_Create(dyn_zone, change) + self._mod_dynamic_Delete(dyn_zone, change) + return + try: + td = self.traffic_directors[new.fqdn][new._type] + except KeyError: + # There's no td, this is actually a create, we must be going from a + # non-dynamic to dynamic record + # First create the dynamic record + self._mode_dynamic_Create(dyn_zone, change) + # Make sure the details are correct + self._mod_dynamic_rulesets(td, change) + if change.old.geo: + # From a geo, so remove the old geo + self._mod_geo_Delete(dyn_zone, change) + else: + # From a generic so remove the old generic + self._mod_Delete(dyn_zone, change) + def _mod_Create(self, dyn_zone, change): new = change.new kwargs_for = getattr(self, '_kwargs_for_{}'.format(new._type)) @@ -866,9 +1201,14 @@ class DynProvider(BaseProvider): self.log.debug('_apply_traffic_directors: zone=%s', desired.name) unhandled_changes = [] for c in changes: + pprint(c) # we only mess with changes that have geo info somewhere - if getattr(c.new, 'geo', False) or getattr(c.existing, 'geo', - False): + if getattr(c.new, 'dynamic', False) or getattr(c.existing, + 'dynamic', False): + mod = getattr(self, '_mod_dynamic_{}'.format(c.__class__.__name__)) + mod(dyn_zone, c) + elif getattr(c.new, 'geo', False) or getattr(c.existing, 'geo', + False): mod = getattr(self, '_mod_geo_{}'.format(c.__class__.__name__)) mod(dyn_zone, c) else: diff --git a/octodns/record/__init__.py b/octodns/record/__init__.py index a198cc5..68dd501 100644 --- a/octodns/record/__init__.py +++ b/octodns/record/__init__.py @@ -533,6 +533,7 @@ class _DynamicMixin(object): else: seen_default = False + # TODO: warn or error on unused pools? for i, rule in enumerate(rules): rule_num = i + 1 try: @@ -567,6 +568,11 @@ class _DynamicMixin(object): return reasons + @classmethod + def geo_parse(cls, code): + match = cls.geo_re.match(code) + return match.groupdict() + def __init__(self, zone, name, data, *args, **kwargs): super(_DynamicMixin, self).__init__(zone, name, data, *args, **kwargs)