Browse Source

Shim AxfrSource and ZoneFileSource

pull/949/head
Ross McFarland 3 years ago
parent
commit
c6392d798d
No known key found for this signature in database GPG Key ID: 943B179E15D3B22A
3 changed files with 25 additions and 305 deletions
  1. +5
    -0
      CHANGELOG.md
  2. +13
    -184
      octodns/source/axfr.py
  3. +7
    -121
      tests/test_octodns_source_axfr.py

+ 5
- 0
CHANGELOG.md View File

@ -1,3 +1,8 @@
## v0.9.21 - 2022-??-?? - ???
* Shim AxfrSource and ZoneFileSource post extraction into
https://github.com/octodns/octodns-bind
## v0.9.20 - 2022-10-05 - International friendly
#### Noteworthy changes


+ 13
- 184
octodns/source/axfr.py View File

@ -2,190 +2,19 @@
#
#
import dns.name
import dns.query
import dns.zone
import dns.rdatatype
from logging import getLogger
from dns.exception import DNSException
from os import listdir
from os.path import join
import logging
from ..record import Record, Rr
from .base import BaseSource
class AxfrBaseSource(BaseSource):
SUPPORTS_GEO = False
SUPPORTS_DYNAMIC = False
SUPPORTS = set(
(
'A',
'AAAA',
'CAA',
'CNAME',
'LOC',
'MX',
'NS',
'PTR',
'SPF',
'SRV',
'SSHFP',
'TXT',
)
logger = getLogger('AXFR')
try:
logger.warning(
'octodns_bind shimmed. Update your provider class to octodns_bind.AxfrSource or octodns_bind.ZoneFileSource. Shim will be removed in 1.0'
)
from octodns_bind import AxfrSource, ZoneFileSource
def __init__(self, id):
super().__init__(id)
def populate(self, zone, target=False, lenient=False):
self.log.debug(
'populate: name=%s, target=%s, lenient=%s',
zone.name,
target,
lenient,
)
before = len(zone.records)
rrs = self.zone_records(zone)
for record in Record.from_rrs(zone, rrs, lenient=lenient):
zone.add_record(record, lenient=lenient)
self.log.info(
'populate: found %s records', len(zone.records) - before
)
class AxfrSourceException(Exception):
pass
class AxfrSourceZoneTransferFailed(AxfrSourceException):
def __init__(self):
super().__init__('Unable to Perform Zone Transfer')
class AxfrSource(AxfrBaseSource):
'''
Axfr zonefile importer to import data
axfr:
class: octodns.source.axfr.AxfrSource
# The address of nameserver to perform zone transfer against
master: ns1.example.com
'''
def __init__(self, id, master):
self.log = logging.getLogger(f'AxfrSource[{id}]')
self.log.debug('__init__: id=%s, master=%s', id, master)
super().__init__(id)
self.master = master
def zone_records(self, zone):
try:
z = dns.zone.from_xfr(
dns.query.xfr(self.master, zone.name, relativize=False),
relativize=False,
)
except DNSException:
raise AxfrSourceZoneTransferFailed()
records = []
for (name, ttl, rdata) in z.iterate_rdatas():
rdtype = dns.rdatatype.to_text(rdata.rdtype)
if rdtype in self.SUPPORTS:
records.append(Rr(name.to_text(), rdtype, ttl, rdata.to_text()))
return records
class ZoneFileSourceException(Exception):
pass
class ZoneFileSourceNotFound(ZoneFileSourceException):
def __init__(self):
super().__init__('Zone file not found')
class ZoneFileSourceLoadFailure(ZoneFileSourceException):
def __init__(self, error):
super().__init__(str(error))
class ZoneFileSource(AxfrBaseSource):
'''
Bind compatible zone file source
zonefile:
class: octodns.source.axfr.ZoneFileSource
# The directory holding the zone files
# Filenames should match zone name (eg. example.com.)
# with optional extension specified with file_extension
directory: ./zonefiles
# File extension on zone files
# Appended to zone name to locate file
# (optional, default None)
file_extension: zone
# Should sanity checks of the origin node be done
# (optional, default true)
check_origin: false
'''
def __init__(self, id, directory, file_extension='.', check_origin=True):
self.log = logging.getLogger(f'ZoneFileSource[{id}]')
self.log.debug(
'__init__: id=%s, directory=%s, file_extension=%s, '
'check_origin=%s',
id,
directory,
file_extension,
check_origin,
)
super().__init__(id)
self.directory = directory
self.file_extension = file_extension
self.check_origin = check_origin
self._zone_records = {}
def _load_zone_file(self, zone_name):
zone_filename = f'{zone_name[:-1]}{self.file_extension}'
zonefiles = listdir(self.directory)
if zone_filename in zonefiles:
try:
z = dns.zone.from_file(
join(self.directory, zone_filename),
zone_name,
relativize=False,
check_origin=self.check_origin,
)
except DNSException as error:
raise ZoneFileSourceLoadFailure(error)
else:
raise ZoneFileSourceNotFound()
return z
def zone_records(self, zone):
if zone.name not in self._zone_records:
try:
z = self._load_zone_file(zone.name)
except ZoneFileSourceNotFound:
return []
records = []
for (name, ttl, rdata) in z.iterate_rdatas():
rdtype = dns.rdatatype.to_text(rdata.rdtype)
if rdtype in self.SUPPORTS:
records.append(
Rr(name.to_text(), rdtype, ttl, rdata.to_text())
)
self._zone_records[zone.name] = records
return self._zone_records[zone.name]
AxfrSource # pragma: no cover
ZoneFileSource # pragma: no cover
except ModuleNotFoundError:
logger.exception(
'AXFR/Zone file support has been moved into a separate module, octodns_bind is now required. Provider classes should be updated to octodns_bind.AxfrSource or octodns_bind.ZoneFileSource. See https://github.com/octodns/octodns#updating-to-use-extracted-providers for more information.'
)
raise

+ 7
- 121
tests/test_octodns_source_axfr.py View File

@ -2,131 +2,17 @@
#
#
import dns.zone
from dns.exception import DNSException
from os.path import exists
from shutil import copyfile
from unittest import TestCase
from unittest.mock import patch
from octodns.source.axfr import (
AxfrSource,
AxfrSourceZoneTransferFailed,
ZoneFileSource,
ZoneFileSourceLoadFailure,
)
from octodns.zone import Zone
from octodns.record import ValidationError
class TestAxfrSource(TestCase):
source = AxfrSource('test', 'localhost')
forward_zonefile = dns.zone.from_file(
'./tests/zones/unit.tests.tst', 'unit.tests', relativize=False
)
reverse_zonefile = dns.zone.from_file(
'./tests/zones/2.0.192.in-addr.arpa.',
'2.0.192.in-addr.arpa',
relativize=False,
)
@patch('dns.zone.from_xfr')
def test_populate_forward(self, from_xfr_mock):
got = Zone('unit.tests.', [])
from_xfr_mock.side_effect = [self.forward_zonefile, DNSException]
self.source.populate(got)
self.assertEqual(16, len(got.records))
with self.assertRaises(AxfrSourceZoneTransferFailed) as ctx:
zone = Zone('unit.tests.', [])
self.source.populate(zone)
self.assertEqual('Unable to Perform Zone Transfer', str(ctx.exception))
@patch('dns.zone.from_xfr')
def test_populate_reverse(self, from_xfr_mock):
got = Zone('2.0.192.in-addr.arpa.', [])
from_xfr_mock.side_effect = [self.reverse_zonefile]
self.source.populate(got)
self.assertEqual(4, len(got.records))
class TestZoneFileSource(TestCase):
source = ZoneFileSource('test', './tests/zones', file_extension='.tst')
def test_zonefiles_with_extension(self):
source = ZoneFileSource('test', './tests/zones', '.extension')
# Load zonefiles with a specified file extension
valid = Zone('ext.unit.tests.', [])
source.populate(valid)
self.assertEqual(1, len(valid.records))
def test_zonefiles_without_extension(self):
# Windows doesn't let files end with a `.` so we add a .tst to them in
# the repo and then try and create the `.` version we need for the
# default case (no extension.)
copyfile('./tests/zones/unit.tests.tst', './tests/zones/unit.tests.')
# Unfortunately copyfile silently works and create the file without
# the `.` so we have to check to see if it did that
if exists('./tests/zones/unit.tests'):
# It did so we need to skip this test, that means windows won't
# have full code coverage, but skipping the test is going out of
# our way enough for a os-specific/oddball case.
self.skipTest(
'Unable to create unit.tests. (ending with .) so '
'skipping default filename testing.'
)
source = ZoneFileSource('test', './tests/zones')
# Load zonefiles without a specified file extension
valid = Zone('unit.tests.', [])
source.populate(valid)
self.assertEqual(16, len(valid.records))
def test_populate(self):
# Valid zone file in directory
valid = Zone('unit.tests.', [])
self.source.populate(valid)
self.assertEqual(16, len(valid.records))
# 2nd populate does not read file again
again = Zone('unit.tests.', [])
self.source.populate(again)
self.assertEqual(16, len(again.records))
# bust the cache
del self.source._zone_records[valid.name]
# No zone file in directory
missing = Zone('missing.zone.', [])
self.source.populate(missing)
self.assertEqual(0, len(missing.records))
def test_missing(self):
with self.assertRaises(ModuleNotFoundError):
from octodns.source.axfr import AxfrSource
# Zone file is not valid
with self.assertRaises(ZoneFileSourceLoadFailure) as ctx:
zone = Zone('invalid.zone.', [])
self.source.populate(zone)
self.assertEqual(
'The DNS zone has no NS RRset at its origin.', str(ctx.exception)
)
AxfrSource
# Records are not to RFC (lenient=False)
with self.assertRaises(ValidationError) as ctx:
zone = Zone('invalid.records.', [])
self.source.populate(zone)
self.assertEqual(
'Invalid record _invalid.invalid.records.\n'
' - invalid name for SRV record',
str(ctx.exception),
)
with self.assertRaises(ModuleNotFoundError):
from octodns.source.axfr import ZoneFileSource
# Records are not to RFC, but load anyhow (lenient=True)
invalid = Zone('invalid.records.', [])
self.source.populate(invalid, lenient=True)
self.assertEqual(12, len(invalid.records))
ZoneFileSource

Loading…
Cancel
Save