diff --git a/octodns/processor/filter.py b/octodns/processor/filter.py index f3aabf5..5bfaa36 100644 --- a/octodns/processor/filter.py +++ b/octodns/processor/filter.py @@ -9,6 +9,8 @@ from __future__ import ( unicode_literals, ) +from re import compile as re_compile + from .base import BaseProcessor @@ -19,8 +21,8 @@ class TypeAllowlistFilter(BaseProcessor): processors: only-a-and-aaaa: - class: octodns.processor.filter.TypeRejectlistFilter - rejectlist: + class: octodns.processor.filter.TypeAllowlistFilter + allowlist: - A - AAAA @@ -35,7 +37,7 @@ class TypeAllowlistFilter(BaseProcessor): ''' def __init__(self, name, allowlist): - super(TypeAllowlistFilter, self).__init__(name) + super().__init__(name) self.allowlist = set(allowlist) def _process(self, zone, *args, **kwargs): @@ -71,7 +73,7 @@ class TypeRejectlistFilter(BaseProcessor): ''' def __init__(self, name, rejectlist): - super(TypeRejectlistFilter, self).__init__(name) + super().__init__(name) self.rejectlist = set(rejectlist) def _process(self, zone, *args, **kwargs): @@ -83,3 +85,114 @@ class TypeRejectlistFilter(BaseProcessor): process_source_zone = _process process_target_zone = _process + + +class _NameBaseFilter(BaseProcessor): + def __init__(self, name, _list): + super().__init__(name) + exact = set() + regex = [] + for pattern in _list: + if pattern.startswith('/'): + regex.append(re_compile(pattern[1:-1])) + else: + exact.add(pattern) + self.exact = exact + self.regex = regex + + +class NameAllowlistFilter(_NameBaseFilter): + '''Only manage records with names that match the provider patterns + + Example usage: + + processors: + only-these: + class: octodns.processor.filter.NameAllowlistFilter + allowlist: + # exact string match + - www + # contains/substring match + - /substring/ + # regex pattern match + - /some-pattern-\\d\\+/ + # regex - anchored so has to match start to end + - /^start-.+-end$/ + + zones: + exxampled.com.: + sources: + - config + processors: + - only-these + targets: + - route53 + ''' + + def __init__(self, name, allowlist): + super().__init__(name, allowlist) + + def _process(self, zone, *args, **kwargs): + for record in zone.records: + name = record.name + if name in self.exact: + continue + + if any(r.search(name) for r in self.regex): + continue + + zone.remove_record(record) + + return zone + + process_source_zone = _process + process_target_zone = _process + + +class NameRejectlistFilter(_NameBaseFilter): + '''Reject managing records with names that match the provider patterns + + Example usage: + + processors: + not-these: + class: octodns.processor.filter.NameRejectlistFilter + rejectlist: + # exact string match + - www + # contains/substring match + - /substring/ + # regex pattern match + - /some-pattern-\\d\\+/ + # regex - anchored so has to match start to end + - /^start-.+-end$/ + + zones: + exxampled.com.: + sources: + - config + processors: + - not-these + targets: + - route53 + ''' + + def __init__(self, name, rejectlist): + super().__init__(name, rejectlist) + + def _process(self, zone, *args, **kwargs): + for record in zone.records: + name = record.name + if name in self.exact: + zone.remove_record(record) + continue + + for regex in self.regex: + if regex.search(name): + zone.remove_record(record) + break + + return zone + + process_source_zone = _process + process_target_zone = _process diff --git a/tests/test_octodns_processor_filter.py b/tests/test_octodns_processor_filter.py index 859677d..aa6f5ff 100644 --- a/tests/test_octodns_processor_filter.py +++ b/tests/test_octodns_processor_filter.py @@ -11,7 +11,12 @@ from __future__ import ( from unittest import TestCase -from octodns.processor.filter import TypeAllowlistFilter, TypeRejectlistFilter +from octodns.processor.filter import ( + NameAllowlistFilter, + NameRejectlistFilter, + TypeAllowlistFilter, + TypeRejectlistFilter, +) from octodns.record import Record from octodns.zone import Zone @@ -76,3 +81,83 @@ class TestTypeRejectListFilter(TestCase): filter_a_aaaa = TypeRejectlistFilter('not-a-aaaa', set(('A', 'AAAA'))) got = filter_a_aaaa.process_target_zone(zone.copy()) self.assertEqual(['txt', 'txt2'], sorted([r.name for r in got.records])) + + +class TestNameAllowListFilter(TestCase): + zone = Zone('unit.tests.', []) + matches = Record.new( + zone, 'matches', {'type': 'A', 'ttl': 42, 'value': '1.2.3.4'} + ) + zone.add_record(matches) + doesnt = Record.new( + zone, 'doesnt', {'type': 'A', 'ttl': 42, 'value': '2.3.4.5'} + ) + zone.add_record(doesnt) + matchable1 = Record.new( + zone, 'start-f43ad96-end', {'type': 'A', 'ttl': 42, 'value': '3.4.5.6'} + ) + zone.add_record(matchable1) + matchable2 = Record.new( + zone, 'start-a3b444c-end', {'type': 'A', 'ttl': 42, 'value': '4.5.6.7'} + ) + zone.add_record(matchable2) + + def test_exact(self): + allows = NameAllowlistFilter('exact', ('matches',)) + + self.assertEqual(4, len(self.zone.records)) + filtered = allows.process_source_zone(self.zone.copy()) + self.assertEqual(1, len(filtered.records)) + self.assertEqual(['matches'], [r.name for r in filtered.records]) + + def test_regex(self): + allows = NameAllowlistFilter('exact', ('/^start-.+-end$/',)) + + self.assertEqual(4, len(self.zone.records)) + filtered = allows.process_source_zone(self.zone.copy()) + self.assertEqual(2, len(filtered.records)) + self.assertEqual( + ['start-a3b444c-end', 'start-f43ad96-end'], + sorted([r.name for r in filtered.records]), + ) + + +class TestNameRejectListFilter(TestCase): + zone = Zone('unit.tests.', []) + matches = Record.new( + zone, 'matches', {'type': 'A', 'ttl': 42, 'value': '1.2.3.4'} + ) + zone.add_record(matches) + doesnt = Record.new( + zone, 'doesnt', {'type': 'A', 'ttl': 42, 'value': '2.3.4.5'} + ) + zone.add_record(doesnt) + matchable1 = Record.new( + zone, 'start-f43ad96-end', {'type': 'A', 'ttl': 42, 'value': '3.4.5.6'} + ) + zone.add_record(matchable1) + matchable2 = Record.new( + zone, 'start-a3b444c-end', {'type': 'A', 'ttl': 42, 'value': '4.5.6.7'} + ) + zone.add_record(matchable2) + + def test_exact(self): + rejects = NameRejectlistFilter('exact', ('matches',)) + + self.assertEqual(4, len(self.zone.records)) + filtered = rejects.process_source_zone(self.zone.copy()) + self.assertEqual(3, len(filtered.records)) + self.assertEqual( + ['doesnt', 'start-a3b444c-end', 'start-f43ad96-end'], + sorted([r.name for r in filtered.records]), + ) + + def test_regex(self): + rejects = NameRejectlistFilter('exact', ('/^start-.+-end$/',)) + + self.assertEqual(4, len(self.zone.records)) + filtered = rejects.process_source_zone(self.zone.copy()) + self.assertEqual(2, len(filtered.records)) + self.assertEqual( + ['doesnt', 'matches'], sorted([r.name for r in filtered.records]) + )