diff --git a/octodns/processor/filter.py b/octodns/processor/filter.py index 2de3b23..db14c85 100644 --- a/octodns/processor/filter.py +++ b/octodns/processor/filter.py @@ -2,6 +2,8 @@ # # +from ipaddress import ip_address, ip_network +from itertools import product from logging import getLogger from re import compile as re_compile @@ -125,6 +127,35 @@ class _NameBaseFilter(BaseProcessor): process_target_zone = _process +class _NetworkValueBaseFilter(BaseProcessor): + def __init__(self, name, _list): + super().__init__(name) + self.networks = [] + for value in _list: + try: + self.networks.append(ip_network(value)) + except ValueError: + raise ValueError(f'{value} is not a valid CIDR to use') + + def _process(self, zone, *args, **kwargs): + for record in zone.records: + if record._type not in ['A', 'AAAA']: + continue + + if any( + ip_address(value) in network + for value, network in product(record.values, self.networks) + ): + self.matches(zone, record) + else: + self.doesnt_match(zone, record) + + return zone + + process_source_zone = _process + process_target_zone = _process + + class NameAllowlistFilter(_NameBaseFilter, AllowsMixin): '''Only manage records with names that match the provider patterns @@ -189,6 +220,60 @@ class NameRejectlistFilter(_NameBaseFilter, RejectsMixin): super().__init__(name, rejectlist) +class NetworkValueAllowlistFilter(_NetworkValueBaseFilter, AllowsMixin): + '''Only manage records with values that match the provider patterns + + Example usage: + + processors: + only-these: + class: octodns.processor.filter.NetworkValueAllowlistFilter + allowlist: + - 127.0.0.1/32 + - 192.168.0.0/16 + - fd00::/8 + + zones: + exxampled.com.: + sources: + - config + processors: + - only-these + targets: + - route53 + ''' + + def __init__(self, name, allowlist): + super().__init__(name, allowlist) + + +class NetworkValueRejectlistFilter(_NetworkValueBaseFilter, RejectsMixin): + '''Reject managing records with value matching a that match the provider patterns + + Example usage: + + processors: + not-these: + class: octodns.processor.filter.NetworkValueRejectlistFilter + rejectlist: + - 127.0.0.1/32 + - 192.168.0.0/16 + - fd00::/8 + + zones: + exxampled.com.: + sources: + - config + processors: + - not-these + targets: + - route53 + ''' + + def __init__(self, name, rejectlist): + super().__init__(name, rejectlist) + + class IgnoreRootNsFilter(BaseProcessor): '''Do not manage Root NS Records. diff --git a/tests/test_octodns_processor_filter.py b/tests/test_octodns_processor_filter.py index 2d9b881..4ecef63 100644 --- a/tests/test_octodns_processor_filter.py +++ b/tests/test_octodns_processor_filter.py @@ -9,6 +9,8 @@ from octodns.processor.filter import ( IgnoreRootNsFilter, NameAllowlistFilter, NameRejectlistFilter, + NetworkValueAllowlistFilter, + NetworkValueRejectlistFilter, TypeAllowlistFilter, TypeRejectlistFilter, ZoneNameFilter, @@ -161,6 +163,38 @@ class TestNameRejectListFilter(TestCase): ) +class TestNetworkValueFilter(TestCase): + zone = Zone('unit.tests.', []) + matches = Record.new( + zone, 'private-ipv4', {'type': 'A', 'ttl': 42, 'value': '10.42.42.42'} + ) + zone.add_record(matches) + doesnt = Record.new( + zone, 'public-ipv4', {'type': 'A', 'ttl': 42, 'value': '42.42.42.42'} + ) + zone.add_record(doesnt) + matchable1 = Record.new( + zone, 'private-ipv6', {'type': 'AAAA', 'ttl': 42, 'value': 'fd12:3456:789a:1::1'} + ) + zone.add_record(matchable1) + matchable2 = Record.new( + zone, 'public-ipv6', {'type': 'AAAA', 'ttl': 42, 'value': 'dead:beef:cafe::1'} + ) + zone.add_record(matchable2) + + def test_reject(self): + filter_private = NetworkValueRejectlistFilter('rejectlist', set(('10.0.0.0/8', 'fd00::/8'))) + + got = filter_private.process_source_zone(self.zone.copy()) + self.assertEqual(['public-ipv4', 'public-ipv6'], sorted([r.name for r in got.records])) + + def test_allow(self): + filter_private = NetworkValueAllowlistFilter('allowlist', set(('10.0.0.0/8', 'fd00::/8'))) + + got = filter_private.process_source_zone(self.zone.copy()) + self.assertEqual(['private-ipv4', 'private-ipv6'], sorted([r.name for r in got.records])) + + class TestIgnoreRootNsFilter(TestCase): zone = Zone('unit.tests.', []) root = Record.new(