From da818d12e43c7f35d9183e7d4d93a1d1a0d7ae59 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Fri, 6 Oct 2023 16:14:38 -0700 Subject: [PATCH] ZoneNameFilter to error/ignore when record names end with the zone name --- CHANGELOG.md | 5 ++ octodns/processor/filter.py | 51 +++++++++++++++++++ tests/test_octodns_processor_filter.py | 69 ++++++++++++++++++++++++++ 3 files changed, 125 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c4dff5..d0ab5d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## v1.3.0 - 2023-??-?? - ??? + +* Added ZoneNameFilter processor to enable ignoring/alerting on type-os like + octodns.com.octodns.com + ## v1.2.1 - 2023-09-29 - Now with fewer stale files * Update script/release to do clean room dist builds diff --git a/octodns/processor/filter.py b/octodns/processor/filter.py index 078fcf5..e02bc84 100644 --- a/octodns/processor/filter.py +++ b/octodns/processor/filter.py @@ -4,6 +4,7 @@ from re import compile as re_compile +from ..record.exception import ValidationError from .base import BaseProcessor @@ -215,3 +216,53 @@ class IgnoreRootNsFilter(BaseProcessor): process_source_zone = _process process_target_zone = _process + + +class ZoneNameFilter(BaseProcessor): + '''Filter or error on record names that contain the zone name + + Example usage: + + processors: + zone-name: + class: octodns.processor.filter.ZoneNameFilter + # If true a ValidationError will be throw when such records are + # encouterd, if false the records will just be ignored/omitted. + # (default: false) + + zones: + exxampled.com.: + sources: + - config + processors: + - zone-name + targets: + - azure + ''' + + def __init__(self, name, error=False): + super().__init__(name) + self.error = error + + def _process(self, zone, *args, **kwargs): + zone_name_with_dot = zone.name + zone_name_without_dot = zone_name_with_dot[:-1] + for record in zone.records: + name = record.name + if name.endswith(zone_name_with_dot) or name.endswith( + zone_name_without_dot + ): + if self.error: + raise ValidationError( + record.fqdn, + ['record name ends with zone name'], + record.context, + ) + else: + # just remove it + zone.remove_record(record) + + return zone + + process_source_zone = _process + process_target_zone = _process diff --git a/tests/test_octodns_processor_filter.py b/tests/test_octodns_processor_filter.py index a18eb51..0a1b6dd 100644 --- a/tests/test_octodns_processor_filter.py +++ b/tests/test_octodns_processor_filter.py @@ -10,8 +10,10 @@ from octodns.processor.filter import ( NameRejectlistFilter, TypeAllowlistFilter, TypeRejectlistFilter, + ZoneNameFilter, ) from octodns.record import Record +from octodns.record.exception import ValidationError from octodns.zone import Zone zone = Zone('unit.tests.', []) @@ -180,3 +182,70 @@ class TestIgnoreRootNsFilter(TestCase): [('A', ''), ('NS', 'sub')], sorted([(r._type, r.name) for r in filtered.records]), ) + + +class TestZoneNameFilter(TestCase): + def test_ends_with_zone(self): + zone_name_filter = ZoneNameFilter('zone-name') + + zone = Zone('unit.tests.', []) + + # something that doesn't come into play + zone.add_record( + Record.new( + zone, 'www', {'type': 'A', 'ttl': 43, 'value': '1.2.3.4'} + ) + ) + + # something that has the zone name, but doesn't end with it + zone.add_record( + Record.new( + zone, + f'{zone.name}more', + {'type': 'A', 'ttl': 43, 'value': '1.2.3.4'}, + ) + ) + + self.assertEqual(2, len(zone.records)) + filtered = zone_name_filter.process_source_zone(zone.copy()) + # get everything back + self.assertEqual(2, len(filtered.records)) + + with_dot = zone.copy() + with_dot.add_record( + Record.new( + zone, zone.name, {'type': 'A', 'ttl': 43, 'value': '1.2.3.4'} + ) + ) + self.assertEqual(3, len(with_dot.records)) + filtered = zone_name_filter.process_source_zone(with_dot.copy()) + # don't get the one that ends with the zone name + self.assertEqual(2, len(filtered.records)) + + without_dot = zone.copy() + without_dot.add_record( + Record.new( + zone, + zone.name[:-1], + {'type': 'A', 'ttl': 43, 'value': '1.2.3.4'}, + ) + ) + self.assertEqual(3, len(without_dot.records)) + filtered = zone_name_filter.process_source_zone(without_dot.copy()) + # don't get the one that ends with the zone name + self.assertEqual(2, len(filtered.records)) + + def test_error(self): + errors = ZoneNameFilter('zone-name', error=True) + + zone = Zone('unit.tests.', []) + zone.add_record( + Record.new( + zone, zone.name, {'type': 'A', 'ttl': 43, 'value': '1.2.3.4'} + ) + ) + with self.assertRaises(ValidationError) as ctx: + errors.process_source_zone(zone) + self.assertEqual( + ['record name ends with zone name'], ctx.exception.reasons + )