Browse Source

Handle chunked values from DNS lookups

pull/978/head
Samuel Parkinson 3 years ago
parent
commit
19e8a27110
2 changed files with 56 additions and 39 deletions
  1. +14
    -7
      octodns/processor/spf.py
  2. +42
    -32
      tests/test_octodns_processor_spf.py

+ 14
- 7
octodns/processor/spf.py View File

@ -2,11 +2,11 @@
# #
# #
import re
from logging import getLogger from logging import getLogger
from typing import Optional from typing import Optional
import dns.resolver import dns.resolver
from dns.resolver import Answer
from octodns.record.base import Record from octodns.record.base import Record
@ -64,23 +64,30 @@ class SpfDnsLookupProcessor(BaseProcessor):
f"_get_spf_from_txt_values: record={record.fqdn} values={values}" f"_get_spf_from_txt_values: record={record.fqdn} values={values}"
) )
# SPF values must begin with 'v=spf1 '
# SPF values to validate will begin with 'v=spf1 '
spf = [value for value in values if value.startswith('v=spf1 ')] spf = [value for value in values if value.startswith('v=spf1 ')]
# No SPF values in the TXT record
if len(spf) == 0: if len(spf) == 0:
return None return None
# More than one SPF value resolves as "permerror", https://datatracker.ietf.org/doc/html/rfc7208#section-4.5
if len(spf) > 1: if len(spf) > 1:
raise SpfValueException( raise SpfValueException(
f"{record.fqdn} has more than one SPF value in the TXT record" f"{record.fqdn} has more than one SPF value in the TXT record"
) )
match = re.search(r"(v=spf1\s.+(?:all|redirect=))", "".join(values))
return spf[0]
if match is None:
raise SpfValueException(f"{record.fqdn} has an invalid SPF value")
def _process_answer(self, answer: Answer) -> list[str]:
values = []
return match.group()
for value in answer:
text_value = value.to_text()
processed_value = text_value[1:-1].replace('" "', '')
values.append(processed_value)
return values
def _check_dns_lookups( def _check_dns_lookups(
self, record: Record, values: list[str], lookups: int = 0 self, record: Record, values: list[str], lookups: int = 0
@ -115,7 +122,7 @@ class SpfDnsLookupProcessor(BaseProcessor):
if term.startswith('include:'): if term.startswith('include:'):
domain = term.removeprefix('include:') domain = term.removeprefix('include:')
answer = dns.resolver.resolve(domain, 'TXT') answer = dns.resolver.resolve(domain, 'TXT')
answer_values = [value.to_text()[1:-1] for value in answer]
answer_values = self._process_answer(answer)
lookups = self._check_dns_lookups( lookups = self._check_dns_lookups(
record, answer_values, lookups record, answer_values, lookups
) )


+ 42
- 32
tests/test_octodns_processor_spf.py View File

@ -13,10 +13,18 @@ from octodns.zone import Zone
class TestSpfDnsLookupProcessor(TestCase): class TestSpfDnsLookupProcessor(TestCase):
def test_get_spf_from_txt_values(self): def test_get_spf_from_txt_values(self):
processor = SpfDnsLookupProcessor('test') processor = SpfDnsLookupProcessor('test')
# Used in logging
record = Record.new( record = Record.new(
Zone('unit.tests.', []), Zone('unit.tests.', []),
'', '',
{'type': 'TXT', 'ttl': 86400, 'values': ['v=DMARC1\; p=reject\;']},
{'type': 'TXT', 'ttl': 86400, 'values': ['']},
)
self.assertIsNone(
processor._get_spf_from_txt_values(
record, ['v=DMARC1\; p=reject\;']
)
) )
self.assertEqual( self.assertEqual(
@ -36,40 +44,16 @@ class TestSpfDnsLookupProcessor(TestCase):
], ],
) )
# Missing "all" or "redirect" at the end
self.assertEqual( self.assertEqual(
'v=spf1 include:example.com ~all',
processor._get_spf_from_txt_values(
record,
['v=DMARC1\; p=reject\;', 'v=spf1 include:example.com ~all'],
),
)
with self.assertRaises(SpfValueException):
processor._get_spf_from_txt_values(
record, ['v=spf1 include:example.com']
)
self.assertIsNone(
processor._get_spf_from_txt_values(
record, ['v=DMARC1\; p=reject\;']
)
)
# SPF record split across multiple character-strings, https://www.rfc-editor.org/rfc/rfc7208#section-3.3
self.assertEqual(
'v=spf1 include:example.com ip4:1.2.3.4 ~all',
'v=spf1 include:example.com',
processor._get_spf_from_txt_values( processor._get_spf_from_txt_values(
record,
[
'v=spf1 include:example.com',
' ip4:1.2.3.4 ~all',
'v=DMARC1\; p=reject\;',
],
record, ['v=spf1 include:example.com', 'v=DMARC1\; p=reject\;']
), ),
) )
self.assertEqual( self.assertEqual(
'v=spf1 +mx redirect=',
'v=spf1 +mx redirect=example.com',
processor._get_spf_from_txt_values( processor._get_spf_from_txt_values(
record, record,
['v=spf1 +mx redirect=example.com', 'v=DMARC1\; p=reject\;'], ['v=spf1 +mx redirect=example.com', 'v=DMARC1\; p=reject\;'],
@ -117,7 +101,7 @@ class TestSpfDnsLookupProcessor(TestCase):
) )
) )
resolver_mock.reset_mock()
resolver_mock.reset_mock(return_value=True, side_effect=True)
txt_value_mock = MagicMock() txt_value_mock = MagicMock()
txt_value_mock.to_text.return_value = '"v=spf1 -all"' txt_value_mock.to_text.return_value = '"v=spf1 -all"'
resolver_mock.return_value = [txt_value_mock] resolver_mock.return_value = [txt_value_mock]
@ -178,7 +162,7 @@ class TestSpfDnsLookupProcessor(TestCase):
) )
) )
resolver_mock.reset_mock()
resolver_mock.reset_mock(return_value=True, side_effect=True)
txt_value_mock = MagicMock() txt_value_mock = MagicMock()
txt_value_mock.to_text.return_value = ( txt_value_mock.to_text.return_value = (
'"v=spf1 a a a a a a a a a a a -all"' '"v=spf1 a a a a a a a a a a a -all"'
@ -205,7 +189,33 @@ class TestSpfDnsLookupProcessor(TestCase):
) )
) )
resolver_mock.reset_mock()
resolver_mock.reset_mock(return_value=True, side_effect=True)
txt_value_mock = MagicMock()
txt_value_mock.to_text.return_value = (
'"v=spf1 ip4:1.2.3.4" " ip4:4.3.2.1 -all"'
)
resolver_mock.return_value = [txt_value_mock]
self.assertEqual(zone, processor.process_source_zone(zone))
resolver_mock.assert_called_once_with('example.com', 'TXT')
zone = Zone('unit.tests.', [])
zone.add_record(
Record.new(
zone,
'',
{
'type': 'TXT',
'ttl': 86400,
'values': [
'v=spf1 include:example.com -all',
'v=DMARC1\; p=reject\;',
],
},
)
)
resolver_mock.reset_mock(return_value=True, side_effect=True)
first_txt_value_mock = MagicMock() first_txt_value_mock = MagicMock()
first_txt_value_mock.to_text.return_value = ( first_txt_value_mock.to_text.return_value = (
'"v=spf1 include:_spf.example.com -all"' '"v=spf1 include:_spf.example.com -all"'


Loading…
Cancel
Save