Browse Source

Merge pull request #978 from sjparkinson/spf-processor

Add SPF processor to validate SPF values in TXT records
pull/981/head
Ross McFarland 3 years ago
committed by GitHub
parent
commit
31d373fc8a
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 548 additions and 2 deletions
  1. +142
    -0
      octodns/processor/spf.py
  2. +1
    -1
      requirements.txt
  3. +1
    -1
      setup.py
  4. +404
    -0
      tests/test_octodns_processor_spf.py

+ 142
- 0
octodns/processor/spf.py View File

@ -0,0 +1,142 @@
#
#
#
from logging import getLogger
from typing import List, Optional
import dns.resolver
from dns.resolver import Answer
from octodns.record.base import Record
from .base import BaseProcessor, ProcessorException
class SpfValueException(ProcessorException):
pass
class SpfDnsLookupException(ProcessorException):
pass
class SpfDnsLookupProcessor(BaseProcessor):
'''
Validate that SPF values in TXT records are valid.
Example usage:
processors:
spf:
class: octodns.processor.spf.SpfDnsLookupProcessor
zones:
example.com.:
sources:
- config
processors:
- spf
targets:
- route53
The validation can be skipped for specific records by setting the lenient
flag, e.g.
_spf:
octodns:
lenient: true
ttl: 86400
type: TXT
value: v=spf1 ptr ~all
'''
log = getLogger('SpfDnsLookupProcessor')
def __init__(self, name):
self.log.debug(f"SpfDnsLookupProcessor: {name}")
super().__init__(name)
def _get_spf_from_txt_values(
self, record: Record, values: List[str]
) -> Optional[str]:
self.log.debug(
f"_get_spf_from_txt_values: record={record.fqdn} values={values}"
)
# SPF values to validate will begin with 'v=spf1 '
spf = [value for value in values if value.startswith('v=spf1 ')]
# No SPF values in the TXT record
if len(spf) == 0:
return None
# More than one SPF value resolves as "permerror", https://datatracker.ietf.org/doc/html/rfc7208#section-4.5
if len(spf) > 1:
raise SpfValueException(
f"{record.fqdn} has more than one SPF value in the TXT record"
)
return spf[0]
def _process_answer(self, answer: Answer) -> List[str]:
values = []
for value in answer:
text_value = value.to_text()
processed_value = text_value[1:-1].replace('" "', '')
values.append(processed_value)
return values
def _check_dns_lookups(
self, record: Record, values: List[str], lookups: int = 0
) -> int:
self.log.debug(
f"_check_dns_lookups: record={record.fqdn} values={values} lookups={lookups}"
)
spf = self._get_spf_from_txt_values(record, values)
if spf is None:
return lookups
terms = spf[len('v=spf1 ') :].split(' ')
for term in terms:
if lookups > 10:
raise SpfDnsLookupException(
f"{record.fqdn} exceeds the 10 DNS lookup limit in the SPF record"
)
if term.startswith('ptr'):
raise SpfValueException(
f"{record.fqdn} uses the deprecated ptr mechanism"
)
# These mechanisms cost one DNS lookup each
if term.startswith(('a', 'mx', 'exists:', 'redirect', 'include:')):
lookups += 1
# The include mechanism can result in further lookups after resolving the DNS record
if term.startswith('include:'):
domain = term[len('include:') :]
answer = dns.resolver.resolve(domain, 'TXT')
answer_values = self._process_answer(answer)
lookups = self._check_dns_lookups(
record, answer_values, lookups
)
return lookups
def process_source_zone(self, zone, *args, **kwargs):
for record in zone.records:
if record._type != 'TXT':
continue
if record._octodns.get('lenient'):
continue
self._check_dns_lookups(record, record.values, 0)
return zone

+ 1
- 1
requirements.txt View File

@ -1,5 +1,5 @@
PyYAML==6.0
dnspython==2.2.1
dnspython==2.3.0
fqdn==1.5.1
idna==3.4
natsort==8.2.0


+ 1
- 1
setup.py View File

@ -84,7 +84,7 @@ setup(
},
install_requires=(
'PyYaml>=4.2b1',
'dnspython>=1.15.0',
'dnspython>=2.2.1',
'fqdn>=1.5.0',
'idna>=3.3',
'natsort>=5.5.0',


+ 404
- 0
tests/test_octodns_processor_spf.py View File

@ -0,0 +1,404 @@
from unittest import TestCase
from unittest.mock import MagicMock, call, patch
from octodns.processor.spf import (
SpfDnsLookupException,
SpfDnsLookupProcessor,
SpfValueException,
)
from octodns.record.base import Record
from octodns.zone import Zone
class TestSpfDnsLookupProcessor(TestCase):
def test_get_spf_from_txt_values(self):
processor = SpfDnsLookupProcessor('test')
# Used in logging
record = Record.new(
Zone('unit.tests.', []),
'',
{'type': 'TXT', 'ttl': 86400, 'values': ['']},
)
self.assertIsNone(
processor._get_spf_from_txt_values(
record, ['v=DMARC1\\; p=reject\\;']
)
)
self.assertEqual(
'v=spf1 include:example.com ~all',
processor._get_spf_from_txt_values(
record,
['v=DMARC1\\; p=reject\\;', 'v=spf1 include:example.com ~all'],
),
)
with self.assertRaises(SpfValueException):
processor._get_spf_from_txt_values(
record,
[
'v=spf1 include:example.com ~all',
'v=spf1 include:example.com ~all',
],
)
# Missing "all" or "redirect" at the end
self.assertEqual(
'v=spf1 include:example.com',
processor._get_spf_from_txt_values(
record,
['v=spf1 include:example.com', 'v=DMARC1\\; p=reject\\;'],
),
)
self.assertEqual(
'v=spf1 +mx redirect=example.com',
processor._get_spf_from_txt_values(
record,
['v=spf1 +mx redirect=example.com', 'v=DMARC1\\; p=reject\\;'],
),
)
@patch('dns.resolver.resolve')
def test_processor(self, resolver_mock):
processor = SpfDnsLookupProcessor('test')
self.assertEqual('test', processor.name)
zone = Zone('unit.tests.', [])
zone.add_record(
Record.new(
zone,
'',
{
'type': 'TXT',
'ttl': 86400,
'values': ['v=DMARC1\\; p=reject\\;'],
},
)
)
zone.add_record(
Record.new(
zone, '', {'type': 'A', 'ttl': 86400, 'value': '1.2.3.4'}
)
)
self.assertEqual(zone, processor.process_source_zone(zone))
zone = Zone('unit.tests.', [])
zone.add_record(
Record.new(
zone,
'',
{
'type': 'TXT',
'ttl': 86400,
'values': [
'v=spf1 a include:example.com ~all',
'v=DMARC1\\; p=reject\\;',
],
},
)
)
resolver_mock.reset_mock(return_value=True, side_effect=True)
txt_value_mock = MagicMock()
txt_value_mock.to_text.return_value = '"v=spf1 -all"'
resolver_mock.return_value = [txt_value_mock]
self.assertEqual(zone, processor.process_source_zone(zone))
resolver_mock.assert_called_once_with('example.com', 'TXT')
zone = Zone('unit.tests.', [])
zone.add_record(
Record.new(
zone,
'',
{
'type': 'TXT',
'ttl': 86400,
'values': [
'v=spf1 a ip4:1.2.3.4 ip6:2001:0db8:85a3:0000:0000:8a2e:0370:7334 -all',
'v=DMARC1\\; p=reject\\;',
],
},
)
)
self.assertEqual(zone, processor.process_source_zone(zone))
zone = Zone('unit.tests.', [])
zone.add_record(
Record.new(
zone,
'',
{
'type': 'TXT',
'ttl': 86400,
'values': [
'v=spf1 a mx exists:example.com a a a a a a a a ~all',
'v=DMARC1\\; p=reject\\;',
],
},
)
)
with self.assertRaises(SpfDnsLookupException):
processor.process_source_zone(zone)
zone = Zone('unit.tests.', [])
zone.add_record(
Record.new(
zone,
'',
{
'type': 'TXT',
'ttl': 86400,
'values': [
'v=spf1 include:example.com -all',
'v=DMARC1\\; p=reject\\;',
],
},
)
)
resolver_mock.reset_mock(return_value=True, side_effect=True)
txt_value_mock = MagicMock()
txt_value_mock.to_text.return_value = (
'"v=spf1 a a a a a a a a a a a -all"'
)
resolver_mock.return_value = [txt_value_mock]
with self.assertRaises(SpfDnsLookupException):
processor.process_source_zone(zone)
resolver_mock.assert_called_once_with('example.com', 'TXT')
zone = Zone('unit.tests.', [])
zone.add_record(
Record.new(
zone,
'',
{
'type': 'TXT',
'ttl': 86400,
'values': [
'v=spf1 include:example.com -all',
'v=DMARC1\\; p=reject\\;',
],
},
)
)
resolver_mock.reset_mock(return_value=True, side_effect=True)
txt_value_mock = MagicMock()
txt_value_mock.to_text.return_value = (
'"v=spf1 ip4:1.2.3.4" " ip4:4.3.2.1 -all"'
)
resolver_mock.return_value = [txt_value_mock]
self.assertEqual(zone, processor.process_source_zone(zone))
resolver_mock.assert_called_once_with('example.com', 'TXT')
zone = Zone('unit.tests.', [])
zone.add_record(
Record.new(
zone,
'',
{
'type': 'TXT',
'ttl': 86400,
'values': [
'v=spf1 include:example.com -all',
'v=DMARC1\\; p=reject\\;',
],
},
)
)
resolver_mock.reset_mock(return_value=True, side_effect=True)
first_txt_value_mock = MagicMock()
first_txt_value_mock.to_text.return_value = (
'"v=spf1 include:_spf.example.com -all"'
)
second_txt_value_mock = MagicMock()
second_txt_value_mock.to_text.return_value = '"v=spf1 a -all"'
resolver_mock.side_effect = [
[first_txt_value_mock],
[second_txt_value_mock],
]
self.assertEqual(zone, processor.process_source_zone(zone))
resolver_mock.assert_has_calls(
[call('example.com', 'TXT'), call('_spf.example.com', 'TXT')]
)
def test_processor_with_long_txt_value(self):
processor = SpfDnsLookupProcessor('test')
zone = Zone('unit.tests.', [])
zone.add_record(
Record.new(
zone,
'',
{
'type': 'TXT',
'ttl': 86400,
'value': (
'v=spf1 ip6:2001:0db8:85a3:0000:0000:8a2e:0370:7334 ip6:2001:0db8:85a3:0000:0000:8a2e:0370:7334'
' ip6:2001:0db8:85a3:0000:0000:8a2e:0370:7334 ip6:2001:0db8:85a3:0000:0000:8a2e:0370:7334'
' ip6:2001:0db8:85a3:0000:0000:8a2e:0370:7334 ip6:2001:0db8:85a3:0000:0000:8a2e:0370:7334'
' ip6:2001:0db8:85a3:0000:0000:8a2e:0370:7334 ip6:2001:0db8:85a3:0000:0000:8a2e:0370:7334'
' ip6:2001:0db8:85a3:0000:0000:8a2e:0370:7334 ip6:2001:0db8:85a3:0000:0000:8a2e:0370:7334'
' ip6:2001:0db8:85a3:0000:0000:8a2e:0370:7334 ip6:2001:0db8:85a3:0000:0000:8a2e:0370:7334'
' ip6:2001:0db8:85a3:0000:0000:8a2e:0370:7334 ~all'
),
},
)
)
self.assertEqual(zone, processor.process_source_zone(zone))
@patch('dns.resolver.resolve')
def test_processor_with_lenient_record(self, resolver_mock):
processor = SpfDnsLookupProcessor('test')
zone = Zone('unit.tests.', [])
lenient = Record.new(
zone,
'lenient',
{
'type': 'TXT',
'ttl': 86400,
'value': 'v=spf1 a a a a a a a a a a a ~all',
'octodns': {'lenient': True},
},
)
zone.add_record(lenient)
self.assertEqual(zone, processor.process_source_zone(zone))
resolver_mock.assert_not_called()
@patch('dns.resolver.resolve')
def test_processor_errors_on_too_many_spf_values(self, resolver_mock):
processor = SpfDnsLookupProcessor('test')
zone = Zone('unit.tests.', [])
record = Record.new(
zone,
'',
{
'type': 'TXT',
'ttl': 86400,
'values': [
'v=spf1 include:_spf.google.com ~all',
'v=spf1 include:mailgun.org ~all',
],
},
)
zone.add_record(record)
with self.assertRaises(SpfValueException):
processor.process_source_zone(zone)
resolver_mock.assert_not_called()
@patch('dns.resolver.resolve')
def test_processor_errors_ptr_mechanisms(self, resolver_mock):
processor = SpfDnsLookupProcessor('test')
zone = Zone('unit.tests.', [])
zone.add_record(
Record.new(
zone,
'',
{'type': 'TXT', 'ttl': 86400, 'values': ['v=spf1 ptr ~all']},
)
)
with self.assertRaises(SpfValueException) as context:
processor.process_source_zone(zone)
self.assertEqual(
'unit.tests. uses the deprecated ptr mechanism',
str(context.exception),
)
resolver_mock.assert_not_called()
zone = Zone('unit.tests.', [])
zone.add_record(
Record.new(
zone,
'',
{
'type': 'TXT',
'ttl': 86400,
'values': ['v=spf1 ptr:example.com ~all'],
},
)
)
resolver_mock.reset_mock(return_value=True, side_effect=True)
with self.assertRaises(SpfValueException) as context:
processor.process_source_zone(zone)
self.assertEqual(
'unit.tests. uses the deprecated ptr mechanism',
str(context.exception),
)
resolver_mock.assert_not_called()
zone = Zone('unit.tests.', [])
zone.add_record(
Record.new(
zone,
'',
{
'type': 'TXT',
'ttl': 86400,
'values': ['v=spf1 include:example.com ~all'],
},
)
)
resolver_mock.reset_mock(return_value=True, side_effect=True)
txt_value_mock = MagicMock()
txt_value_mock.to_text.return_value = '"v=spf1 ptr -all"'
resolver_mock.return_value = [txt_value_mock]
with self.assertRaises(SpfValueException) as context:
processor.process_source_zone(zone)
self.assertEqual(
'unit.tests. uses the deprecated ptr mechanism',
str(context.exception),
)
resolver_mock.assert_called_once_with('example.com', 'TXT')
@patch('dns.resolver.resolve')
def test_processor_errors_on_recursive_include_mechanism(
self, resolver_mock
):
processor = SpfDnsLookupProcessor('test')
zone = Zone('unit.tests.', [])
zone.add_record(
Record.new(
zone,
'',
{
'type': 'TXT',
'ttl': 86400,
'values': ['v=spf1 include:example.com ~all'],
},
)
)
txt_value_mock = MagicMock()
txt_value_mock.to_text.return_value = (
'"v=spf1 include:example.com ~all"'
)
resolver_mock.return_value = [txt_value_mock]
with self.assertRaises(SpfDnsLookupException):
processor.process_source_zone(zone)
resolver_mock.assert_called_with('example.com', 'TXT')

Loading…
Cancel
Save