Browse Source

break record tests into individual files

pull/969/head
Ross McFarland 3 years ago
parent
commit
90c0402b00
No known key found for this signature in database GPG Key ID: 943B179E15D3B22A
27 changed files with 6501 additions and 6129 deletions
  1. +1
    -1
      octodns/record/__init__.py
  2. +0
    -0
      octodns/record/urlfwd.py
  3. +256
    -6127
      tests/test_octodns_record.py
  4. +181
    -0
      tests/test_octodns_record_a.py
  5. +227
    -0
      tests/test_octodns_record_aaaa.py
  6. +108
    -0
      tests/test_octodns_record_alias.py
  7. +273
    -0
      tests/test_octodns_record_caa.py
  8. +92
    -0
      tests/test_octodns_record_change.py
  9. +37
    -0
      tests/test_octodns_record_chunked.py
  10. +136
    -0
      tests/test_octodns_record_cname.py
  11. +94
    -0
      tests/test_octodns_record_dname.py
  12. +206
    -0
      tests/test_octodns_record_ds.py
  13. +1284
    -0
      tests/test_octodns_record_dynamic.py
  14. +184
    -1
      tests/test_octodns_record_geo.py
  15. +30
    -0
      tests/test_octodns_record_ip.py
  16. +697
    -0
      tests/test_octodns_record_loc.py
  17. +265
    -0
      tests/test_octodns_record_mx.py
  18. +438
    -0
      tests/test_octodns_record_naptr.py
  19. +83
    -0
      tests/test_octodns_record_ns.py
  20. +89
    -0
      tests/test_octodns_record_ptr.py
  21. +71
    -0
      tests/test_octodns_record_spf.py
  22. +432
    -0
      tests/test_octodns_record_srv.py
  23. +330
    -0
      tests/test_octodns_record_sshfp.py
  24. +31
    -0
      tests/test_octodns_record_target.py
  25. +421
    -0
      tests/test_octodns_record_tlsa.py
  26. +144
    -0
      tests/test_octodns_record_txt.py
  27. +391
    -0
      tests/test_octodns_record_urlfwd.py

+ 1
- 1
octodns/record/__init__.py View File

@ -25,7 +25,7 @@ from .srv import SrvRecord, SrvValue
from .sshfp import SshfpRecord, SshfpValue
from .tlsa import TlsaRecord, TlsaValue
from .txt import TxtValue, TxtRecord
from .url import UrlfwdRecord, UrlfwdValue
from .urlfwd import UrlfwdRecord, UrlfwdValue
# quell warnings
ARecord


octodns/record/url.py → octodns/record/urlfwd.py View File


+ 256
- 6127
tests/test_octodns_record.py
File diff suppressed because it is too large
View File


+ 181
- 0
tests/test_octodns_record_a.py View File

@ -0,0 +1,181 @@
#
#
#
from unittest import TestCase
from octodns.record import Record
from octodns.record.a import ARecord
from octodns.record.exception import ValidationError
from octodns.zone import Zone
from helpers import SimpleProvider
class TestRecordA(TestCase):
zone = Zone('unit.tests.', [])
def test_a_and_record(self):
a_values = ['1.2.3.4', '2.2.3.4']
a_data = {'ttl': 30, 'values': a_values}
a = ARecord(self.zone, 'a', a_data)
self.assertEqual('a', a.name)
self.assertEqual('a.unit.tests.', a.fqdn)
self.assertEqual(30, a.ttl)
self.assertEqual(a_values, a.values)
self.assertEqual(a_data, a.data)
b_value = '3.2.3.4'
b_data = {'ttl': 30, 'value': b_value}
b = ARecord(self.zone, 'b', b_data)
self.assertEqual([b_value], b.values)
self.assertEqual(b_data, b.data)
# top-level
data = {'ttl': 30, 'value': '4.2.3.4'}
self.assertEqual(self.zone.name, ARecord(self.zone, '', data).fqdn)
self.assertEqual(self.zone.name, ARecord(self.zone, None, data).fqdn)
# ARecord equate with itself
self.assertTrue(a == a)
# Records with differing names and same type don't equate
self.assertFalse(a == b)
# Records with same name & type equate even if ttl is different
self.assertTrue(
a == ARecord(self.zone, 'a', {'ttl': 31, 'values': a_values})
)
# Records with same name & type equate even if values are different
self.assertTrue(
a == ARecord(self.zone, 'a', {'ttl': 30, 'value': b_value})
)
target = SimpleProvider()
# no changes if self
self.assertFalse(a.changes(a, target))
# no changes if clone
other = ARecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
self.assertFalse(a.changes(other, target))
# changes if ttl modified
other.ttl = 31
update = a.changes(other, target)
self.assertEqual(a, update.existing)
self.assertEqual(other, update.new)
# changes if values modified
other.ttl = a.ttl
other.values = ['4.4.4.4']
update = a.changes(other, target)
self.assertEqual(a, update.existing)
self.assertEqual(other, update.new)
# Hashing
records = set()
records.add(a)
self.assertTrue(a in records)
self.assertFalse(b in records)
records.add(b)
self.assertTrue(b in records)
# __repr__ doesn't blow up
a.__repr__()
# Record.__repr__ does
with self.assertRaises(NotImplementedError):
class DummyRecord(Record):
def __init__(self):
pass
DummyRecord().__repr__()
def test_validation_and_values_mixin(self):
# doesn't blow up
Record.new(self.zone, '', {'type': 'A', 'ttl': 600, 'value': '1.2.3.4'})
Record.new(
self.zone, '', {'type': 'A', 'ttl': 600, 'values': ['1.2.3.4']}
)
Record.new(
self.zone,
'',
{'type': 'A', 'ttl': 600, 'values': ['1.2.3.4', '1.2.3.5']},
)
# missing value(s), no value or value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {'type': 'A', 'ttl': 600})
self.assertEqual(['missing value(s)'], ctx.exception.reasons)
# missing value(s), empty values
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone, 'www', {'type': 'A', 'ttl': 600, 'values': []}
)
self.assertEqual(['missing value(s)'], ctx.exception.reasons)
# missing value(s), None values
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone, 'www', {'type': 'A', 'ttl': 600, 'values': None}
)
self.assertEqual(['missing value(s)'], ctx.exception.reasons)
# missing value(s) and empty value
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'www',
{'type': 'A', 'ttl': 600, 'values': [None, '']},
)
self.assertEqual(
['missing value(s)', 'empty value'], ctx.exception.reasons
)
# missing value(s), None value
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone, 'www', {'type': 'A', 'ttl': 600, 'value': None}
)
self.assertEqual(['missing value(s)'], ctx.exception.reasons)
# empty value, empty string value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {'type': 'A', 'ttl': 600, 'value': ''})
self.assertEqual(['empty value'], ctx.exception.reasons)
# missing value(s) & ttl
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {'type': 'A'})
self.assertEqual(
['missing ttl', 'missing value(s)'], ctx.exception.reasons
)
# invalid ipv4 address
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone, '', {'type': 'A', 'ttl': 600, 'value': 'hello'}
)
self.assertEqual(
['invalid IPv4 address "hello"'], ctx.exception.reasons
)
# invalid ipv4 addresses
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{'type': 'A', 'ttl': 600, 'values': ['hello', 'goodbye']},
)
self.assertEqual(
['invalid IPv4 address "hello"', 'invalid IPv4 address "goodbye"'],
ctx.exception.reasons,
)
# invalid & valid ipv4 addresses, no ttl
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{'type': 'A', 'values': ['1.2.3.4', 'hello', '5.6.7.8']},
)
self.assertEqual(
['missing ttl', 'invalid IPv4 address "hello"'],
ctx.exception.reasons,
)

+ 227
- 0
tests/test_octodns_record_aaaa.py View File

@ -0,0 +1,227 @@
#
#
#
from unittest import TestCase
from octodns.record import Record
from octodns.record.aaaa import AaaaRecord
from octodns.record.exception import ValidationError
from octodns.zone import Zone
class TestRecordAaaa(TestCase):
zone = Zone('unit.tests.', [])
def assertMultipleValues(self, _type, a_values, b_value):
a_data = {'ttl': 30, 'values': a_values}
a = _type(self.zone, 'a', a_data)
self.assertEqual('a', a.name)
self.assertEqual('a.unit.tests.', a.fqdn)
self.assertEqual(30, a.ttl)
self.assertEqual(a_values, a.values)
self.assertEqual(a_data, a.data)
b_data = {'ttl': 30, 'value': b_value}
b = _type(self.zone, 'b', b_data)
self.assertEqual([b_value], b.values)
self.assertEqual(b_data, b.data)
def test_aaaa(self):
a_values = [
'2001:db8:3c4d:15::1a2f:1a2b',
'2001:db8:3c4d:15::1a2f:1a3b',
]
b_value = '2001:db8:3c4d:15::1a2f:1a4b'
self.assertMultipleValues(AaaaRecord, a_values, b_value)
# Specifically validate that we normalize IPv6 addresses
values = [
'2001:db8:3c4d:15:0000:0000:1a2f:1a2b',
'2001:0db8:3c4d:0015::1a2f:1a3b',
]
data = {'ttl': 30, 'values': values}
record = AaaaRecord(self.zone, 'aaaa', data)
self.assertEqual(a_values, record.values)
def test_validation(self):
# doesn't blow up
Record.new(
self.zone,
'',
{
'type': 'AAAA',
'ttl': 600,
'value': '2601:644:500:e210:62f8:1dff:feb8:947a',
},
)
Record.new(
self.zone,
'',
{
'type': 'AAAA',
'ttl': 600,
'values': ['2601:644:500:e210:62f8:1dff:feb8:947a'],
},
)
Record.new(
self.zone,
'',
{
'type': 'AAAA',
'ttl': 600,
'values': [
'2601:644:500:e210:62f8:1dff:feb8:947a',
'2601:642:500:e210:62f8:1dff:feb8:947a',
],
},
)
# missing value(s), no value or value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {'type': 'AAAA', 'ttl': 600})
self.assertEqual(['missing value(s)'], ctx.exception.reasons)
# missing value(s), empty values
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone, 'www', {'type': 'AAAA', 'ttl': 600, 'values': []}
)
self.assertEqual(['missing value(s)'], ctx.exception.reasons)
# missing value(s), None values
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone, 'www', {'type': 'AAAA', 'ttl': 600, 'values': None}
)
self.assertEqual(['missing value(s)'], ctx.exception.reasons)
# missing value(s) and empty value
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'www',
{'type': 'AAAA', 'ttl': 600, 'values': [None, '']},
)
self.assertEqual(
['missing value(s)', 'empty value'], ctx.exception.reasons
)
# missing value(s), None value
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone, 'www', {'type': 'AAAA', 'ttl': 600, 'value': None}
)
self.assertEqual(['missing value(s)'], ctx.exception.reasons)
# empty value, empty string value
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone, 'www', {'type': 'AAAA', 'ttl': 600, 'value': ''}
)
self.assertEqual(['empty value'], ctx.exception.reasons)
# missing value(s) & ttl
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {'type': 'AAAA'})
self.assertEqual(
['missing ttl', 'missing value(s)'], ctx.exception.reasons
)
# invalid IPv6 address
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone, '', {'type': 'AAAA', 'ttl': 600, 'value': 'hello'}
)
self.assertEqual(
['invalid IPv6 address "hello"'], ctx.exception.reasons
)
# invalid IPv6 addresses
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{'type': 'AAAA', 'ttl': 600, 'values': ['hello', 'goodbye']},
)
self.assertEqual(
['invalid IPv6 address "hello"', 'invalid IPv6 address "goodbye"'],
ctx.exception.reasons,
)
# invalid & valid IPv6 addresses, no ttl
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'AAAA',
'values': [
'2601:644:500:e210:62f8:1dff:feb8:947a',
'hello',
'2601:642:500:e210:62f8:1dff:feb8:947a',
],
},
)
self.assertEqual(
['missing ttl', 'invalid IPv6 address "hello"'],
ctx.exception.reasons,
)
def test_more_validation(self):
# doesn't blow up
Record.new(
self.zone,
'',
{
'type': 'AAAA',
'ttl': 600,
'value': '2601:644:500:e210:62f8:1dff:feb8:947a',
},
)
Record.new(
self.zone,
'',
{
'type': 'AAAA',
'ttl': 600,
'values': [
'2601:644:500:e210:62f8:1dff:feb8:947a',
'2601:644:500:e210:62f8:1dff:feb8:947b',
],
},
)
# invalid ip address
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone, '', {'type': 'AAAA', 'ttl': 600, 'value': 'hello'}
)
self.assertEqual(
['invalid IPv6 address "hello"'], ctx.exception.reasons
)
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{'type': 'AAAA', 'ttl': 600, 'values': ['1.2.3.4', '2.3.4.5']},
)
self.assertEqual(
[
'invalid IPv6 address "1.2.3.4"',
'invalid IPv6 address "2.3.4.5"',
],
ctx.exception.reasons,
)
# invalid ip addresses
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{'type': 'AAAA', 'ttl': 600, 'values': ['hello', 'goodbye']},
)
self.assertEqual(
['invalid IPv6 address "hello"', 'invalid IPv6 address "goodbye"'],
ctx.exception.reasons,
)

+ 108
- 0
tests/test_octodns_record_alias.py View File

@ -0,0 +1,108 @@
#
#
#
from unittest import TestCase
from octodns.record import Record
from octodns.record.alias import AliasRecord
from octodns.record.exception import ValidationError
from octodns.zone import Zone
from helpers import SimpleProvider
class TestRecordAlias(TestCase):
zone = Zone('unit.tests.', [])
def test_alias(self):
a_data = {'ttl': 0, 'value': 'www.unit.tests.'}
a = AliasRecord(self.zone, '', a_data)
self.assertEqual('', a.name)
self.assertEqual('unit.tests.', a.fqdn)
self.assertEqual(0, a.ttl)
self.assertEqual(a_data['value'], a.value)
self.assertEqual(a_data, a.data)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
# Diff in value causes change
other = AliasRecord(self.zone, 'a', a_data)
other.value = 'foo.unit.tests.'
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# __repr__ doesn't blow up
a.__repr__()
def test_alias_lowering_value(self):
upper_record = AliasRecord(
self.zone,
'aliasUppwerValue',
{'ttl': 30, 'type': 'ALIAS', 'value': 'GITHUB.COM'},
)
lower_record = AliasRecord(
self.zone,
'aliasLowerValue',
{'ttl': 30, 'type': 'ALIAS', 'value': 'github.com'},
)
self.assertEqual(upper_record.value, lower_record.value)
def test_validation_and_value_mixin(self):
# doesn't blow up
Record.new(
self.zone,
'',
{'type': 'ALIAS', 'ttl': 600, 'value': 'foo.bar.com.'},
)
# root only
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'nope',
{'type': 'ALIAS', 'ttl': 600, 'value': 'foo.bar.com.'},
)
self.assertEqual(['non-root ALIAS not allowed'], ctx.exception.reasons)
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {'type': 'ALIAS', 'ttl': 600})
self.assertEqual(['missing value'], ctx.exception.reasons)
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone, '', {'type': 'ALIAS', 'ttl': 600, 'value': None}
)
self.assertEqual(['missing value'], ctx.exception.reasons)
# empty value
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone, '', {'type': 'ALIAS', 'ttl': 600, 'value': ''}
)
self.assertEqual(['empty value'], ctx.exception.reasons)
# not a valid FQDN
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone, '', {'type': 'ALIAS', 'ttl': 600, 'value': '__.'}
)
self.assertEqual(
['ALIAS value "__." is not a valid FQDN'], ctx.exception.reasons
)
# missing trailing .
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{'type': 'ALIAS', 'ttl': 600, 'value': 'foo.bar.com'},
)
self.assertEqual(
['ALIAS value "foo.bar.com" missing trailing .'],
ctx.exception.reasons,
)

+ 273
- 0
tests/test_octodns_record_caa.py View File

@ -0,0 +1,273 @@
#
#
#
from unittest import TestCase
from octodns.record import Record
from octodns.record.caa import CaaRecord, CaaValue
from octodns.record.exception import ValidationError
from octodns.record.rr import RrParseError
from octodns.zone import Zone
from helpers import SimpleProvider
class TestRecordCaa(TestCase):
zone = Zone('unit.tests.', [])
def test_caa(self):
a_values = [
CaaValue({'flags': 0, 'tag': 'issue', 'value': 'ca.example.net'}),
CaaValue(
{
'flags': 128,
'tag': 'iodef',
'value': 'mailto:security@example.com',
}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = CaaRecord(self.zone, 'a', a_data)
self.assertEqual('a', a.name)
self.assertEqual('a.unit.tests.', a.fqdn)
self.assertEqual(30, a.ttl)
self.assertEqual(a_values[0]['flags'], a.values[0].flags)
self.assertEqual(a_values[0]['tag'], a.values[0].tag)
self.assertEqual(a_values[0]['value'], a.values[0].value)
self.assertEqual(a_values[1]['flags'], a.values[1].flags)
self.assertEqual(a_values[1]['tag'], a.values[1].tag)
self.assertEqual(a_values[1]['value'], a.values[1].value)
self.assertEqual(a_data, a.data)
b_value = CaaValue(
{'tag': 'iodef', 'value': 'http://iodef.example.com/'}
)
b_data = {'ttl': 30, 'value': b_value}
b = CaaRecord(self.zone, 'b', b_data)
self.assertEqual(0, b.values[0].flags)
self.assertEqual(b_value['tag'], b.values[0].tag)
self.assertEqual(b_value['value'], b.values[0].value)
b_data['value']['flags'] = 0
self.assertEqual(b_data, b.data)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
# Diff in flags causes change
other = CaaRecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
other.values[0].flags = 128
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in tag causes change
other.values[0].flags = a.values[0].flags
other.values[0].tag = 'foo'
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in value causes change
other.values[0].tag = a.values[0].tag
other.values[0].value = 'bar'
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# __repr__ doesn't blow up
a.__repr__()
def test_caa_value_rdata_text(self):
# empty string won't parse
with self.assertRaises(RrParseError):
CaaValue.parse_rdata_text('')
# single word won't parse
with self.assertRaises(RrParseError):
CaaValue.parse_rdata_text('nope')
# 2nd word won't parse
with self.assertRaises(RrParseError):
CaaValue.parse_rdata_text('0 tag')
# 4th word won't parse
with self.assertRaises(RrParseError):
CaaValue.parse_rdata_text('1 tag value another')
# flags not an int, will parse
self.assertEqual(
{'flags': 'one', 'tag': 'tag', 'value': 'value'},
CaaValue.parse_rdata_text('one tag value'),
)
# valid
self.assertEqual(
{'flags': 0, 'tag': 'tag', 'value': '99148c81'},
CaaValue.parse_rdata_text('0 tag 99148c81'),
)
zone = Zone('unit.tests.', [])
a = CaaRecord(
zone,
'caa',
{
'ttl': 32,
'values': [
{'flags': 1, 'tag': 'tag1', 'value': '99148c81'},
{'flags': 2, 'tag': 'tag2', 'value': '99148c44'},
],
},
)
self.assertEqual(1, a.values[0].flags)
self.assertEqual('tag1', a.values[0].tag)
self.assertEqual('99148c81', a.values[0].value)
self.assertEqual('1 tag1 99148c81', a.values[0].rdata_text)
self.assertEqual(2, a.values[1].flags)
self.assertEqual('tag2', a.values[1].tag)
self.assertEqual('99148c44', a.values[1].value)
self.assertEqual('2 tag2 99148c44', a.values[1].rdata_text)
def test_caa_value(self):
a = CaaValue({'flags': 0, 'tag': 'a', 'value': 'v'})
b = CaaValue({'flags': 1, 'tag': 'a', 'value': 'v'})
c = CaaValue({'flags': 0, 'tag': 'c', 'value': 'v'})
d = CaaValue({'flags': 0, 'tag': 'a', 'value': 'z'})
self.assertEqual(a, a)
self.assertEqual(b, b)
self.assertEqual(c, c)
self.assertEqual(d, d)
self.assertNotEqual(a, b)
self.assertNotEqual(a, c)
self.assertNotEqual(a, d)
self.assertNotEqual(b, a)
self.assertNotEqual(b, c)
self.assertNotEqual(b, d)
self.assertNotEqual(c, a)
self.assertNotEqual(c, b)
self.assertNotEqual(c, d)
self.assertTrue(a < b)
self.assertTrue(a < c)
self.assertTrue(a < d)
self.assertTrue(b > a)
self.assertTrue(b > c)
self.assertTrue(b > d)
self.assertTrue(c > a)
self.assertTrue(c < b)
self.assertTrue(c > d)
self.assertTrue(d > a)
self.assertTrue(d < b)
self.assertTrue(d < c)
self.assertTrue(a <= b)
self.assertTrue(a <= c)
self.assertTrue(a <= d)
self.assertTrue(a <= a)
self.assertTrue(a >= a)
self.assertTrue(b >= a)
self.assertTrue(b >= c)
self.assertTrue(b >= d)
self.assertTrue(b >= b)
self.assertTrue(b <= b)
self.assertTrue(c >= a)
self.assertTrue(c <= b)
self.assertTrue(c >= d)
self.assertTrue(c >= c)
self.assertTrue(c <= c)
self.assertTrue(d >= a)
self.assertTrue(d <= b)
self.assertTrue(d <= c)
self.assertTrue(d >= d)
self.assertTrue(d <= d)
def test_validation(self):
# doesn't blow up
Record.new(
self.zone,
'',
{
'type': 'CAA',
'ttl': 600,
'value': {
'flags': 128,
'tag': 'iodef',
'value': 'http://foo.bar.com/',
},
},
)
# invalid flags
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'CAA',
'ttl': 600,
'value': {
'flags': -42,
'tag': 'iodef',
'value': 'http://foo.bar.com/',
},
},
)
self.assertEqual(['invalid flags "-42"'], ctx.exception.reasons)
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'CAA',
'ttl': 600,
'value': {
'flags': 442,
'tag': 'iodef',
'value': 'http://foo.bar.com/',
},
},
)
self.assertEqual(['invalid flags "442"'], ctx.exception.reasons)
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'CAA',
'ttl': 600,
'value': {
'flags': 'nope',
'tag': 'iodef',
'value': 'http://foo.bar.com/',
},
},
)
self.assertEqual(['invalid flags "nope"'], ctx.exception.reasons)
# missing tag
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'CAA',
'ttl': 600,
'value': {'value': 'http://foo.bar.com/'},
},
)
self.assertEqual(['missing tag'], ctx.exception.reasons)
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{'type': 'CAA', 'ttl': 600, 'value': {'tag': 'iodef'}},
)
self.assertEqual(['missing value'], ctx.exception.reasons)

+ 92
- 0
tests/test_octodns_record_change.py View File

@ -0,0 +1,92 @@
#
#
#
from unittest import TestCase
from octodns.record import Record
from octodns.record.change import Create, Delete, Update
from octodns.zone import Zone
class TestChanges(TestCase):
zone = Zone('unit.tests.', [])
record_a_1 = Record.new(
zone, '1', {'type': 'A', 'ttl': 30, 'value': '1.2.3.4'}
)
record_a_2 = Record.new(
zone, '2', {'type': 'A', 'ttl': 30, 'value': '1.2.3.4'}
)
record_aaaa_1 = Record.new(
zone,
'1',
{
'type': 'AAAA',
'ttl': 30,
'value': '2601:644:500:e210:62f8:1dff:feb8:947a',
},
)
record_aaaa_2 = Record.new(
zone,
'2',
{
'type': 'AAAA',
'ttl': 30,
'value': '2601:644:500:e210:62f8:1dff:feb8:947a',
},
)
def test_sort_same_change_type(self):
# expect things to be ordered by name and type since all the change
# types are the same it doesn't matter
changes = [
Create(self.record_aaaa_1),
Create(self.record_a_2),
Create(self.record_a_1),
Create(self.record_aaaa_2),
]
self.assertEqual(
[
Create(self.record_a_1),
Create(self.record_aaaa_1),
Create(self.record_a_2),
Create(self.record_aaaa_2),
],
sorted(changes),
)
def test_sort_same_different_type(self):
# this time the change type is the deciding factor, deletes come before
# creates, and then updates. Things of the same type, go down the line
# and sort by name, and then type
changes = [
Delete(self.record_aaaa_1),
Create(self.record_aaaa_1),
Update(self.record_aaaa_1, self.record_aaaa_1),
Update(self.record_a_1, self.record_a_1),
Create(self.record_a_1),
Delete(self.record_a_1),
Delete(self.record_aaaa_2),
Create(self.record_aaaa_2),
Update(self.record_aaaa_2, self.record_aaaa_2),
Update(self.record_a_2, self.record_a_2),
Create(self.record_a_2),
Delete(self.record_a_2),
]
self.assertEqual(
[
Delete(self.record_a_1),
Delete(self.record_aaaa_1),
Delete(self.record_a_2),
Delete(self.record_aaaa_2),
Create(self.record_a_1),
Create(self.record_aaaa_1),
Create(self.record_a_2),
Create(self.record_aaaa_2),
Update(self.record_a_1, self.record_a_1),
Update(self.record_aaaa_1, self.record_aaaa_1),
Update(self.record_a_2, self.record_a_2),
Update(self.record_aaaa_2, self.record_aaaa_2),
],
sorted(changes),
)

+ 37
- 0
tests/test_octodns_record_chunked.py View File

@ -0,0 +1,37 @@
#
#
#
from unittest import TestCase
from octodns.record.chunked import _ChunkedValue
from octodns.record.spf import SpfRecord
from octodns.zone import Zone
class TestRecordChunked(TestCase):
def test_chunked_value_rdata_text(self):
for s in (
None,
'',
'word',
42,
42.43,
'1.2.3',
'some.words.that.here',
'1.2.word.4',
'1.2.3.4',
):
self.assertEqual(s, _ChunkedValue.parse_rdata_text(s))
# semi-colons are escaped
self.assertEqual(
'Hello\\; World!', _ChunkedValue.parse_rdata_text('Hello; World!')
)
# since we're always a string validate and __init__ don't
# parse_rdata_text
zone = Zone('unit.tests.', [])
a = SpfRecord(zone, 'a', {'ttl': 42, 'value': 'some.target.'})
self.assertEqual('some.target.', a.values[0].rdata_text)

+ 136
- 0
tests/test_octodns_record_cname.py View File

@ -0,0 +1,136 @@
#
#
#
from unittest import TestCase
from octodns.record import Record
from octodns.record.cname import CnameRecord
from octodns.record.exception import ValidationError
from octodns.zone import Zone
from helpers import SimpleProvider
class TestRecordCname(TestCase):
zone = Zone('unit.tests.', [])
def assertSingleValue(self, _type, a_value, b_value):
a_data = {'ttl': 30, 'value': a_value}
a = _type(self.zone, 'a', a_data)
self.assertEqual('a', a.name)
self.assertEqual('a.unit.tests.', a.fqdn)
self.assertEqual(30, a.ttl)
self.assertEqual(a_value, a.value)
self.assertEqual(a_data, a.data)
b_data = {'ttl': 30, 'value': b_value}
b = _type(self.zone, 'b', b_data)
self.assertEqual(b_value, b.value)
self.assertEqual(b_data, b.data)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
# Diff in value causes change
other = _type(self.zone, 'a', {'ttl': 30, 'value': b_value})
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# __repr__ doesn't blow up
a.__repr__()
def test_cname(self):
self.assertSingleValue(CnameRecord, 'target.foo.com.', 'other.foo.com.')
def test_cname_lowering_value(self):
upper_record = CnameRecord(
self.zone,
'CnameUppwerValue',
{'ttl': 30, 'type': 'CNAME', 'value': 'GITHUB.COM'},
)
lower_record = CnameRecord(
self.zone,
'CnameLowerValue',
{'ttl': 30, 'type': 'CNAME', 'value': 'github.com'},
)
self.assertEqual(upper_record.value, lower_record.value)
def test_validation(self):
# doesn't blow up
Record.new(
self.zone,
'www',
{'type': 'CNAME', 'ttl': 600, 'value': 'foo.bar.com.'},
)
# root cname is a no-no
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{'type': 'CNAME', 'ttl': 600, 'value': 'foo.bar.com.'},
)
self.assertEqual(['root CNAME not allowed'], ctx.exception.reasons)
# not a valid FQDN
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone, 'www', {'type': 'CNAME', 'ttl': 600, 'value': '___.'}
)
self.assertEqual(
['CNAME value "___." is not a valid FQDN'], ctx.exception.reasons
)
# missing trailing .
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'www',
{'type': 'CNAME', 'ttl': 600, 'value': 'foo.bar.com'},
)
self.assertEqual(
['CNAME value "foo.bar.com" missing trailing .'],
ctx.exception.reasons,
)
# doesn't allow urls
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'www',
{'type': 'CNAME', 'ttl': 600, 'value': 'https://google.com'},
)
self.assertEqual(
['CNAME value "https://google.com" is not a valid FQDN'],
ctx.exception.reasons,
)
# doesn't allow urls with paths
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'www',
{
'type': 'CNAME',
'ttl': 600,
'value': 'https://google.com/a/b/c',
},
)
self.assertEqual(
['CNAME value "https://google.com/a/b/c" is not a valid FQDN'],
ctx.exception.reasons,
)
# doesn't allow paths
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'www',
{'type': 'CNAME', 'ttl': 600, 'value': 'google.com/some/path'},
)
self.assertEqual(
['CNAME value "google.com/some/path" is not a valid FQDN'],
ctx.exception.reasons,
)

+ 94
- 0
tests/test_octodns_record_dname.py View File

@ -0,0 +1,94 @@
#
#
#
from unittest import TestCase
from octodns.record import Record
from octodns.record.dname import DnameRecord
from octodns.record.exception import ValidationError
from octodns.zone import Zone
from helpers import SimpleProvider
class TestRecordDname(TestCase):
zone = Zone('unit.tests.', [])
def assertSingleValue(self, _type, a_value, b_value):
a_data = {'ttl': 30, 'value': a_value}
a = _type(self.zone, 'a', a_data)
self.assertEqual('a', a.name)
self.assertEqual('a.unit.tests.', a.fqdn)
self.assertEqual(30, a.ttl)
self.assertEqual(a_value, a.value)
self.assertEqual(a_data, a.data)
b_data = {'ttl': 30, 'value': b_value}
b = _type(self.zone, 'b', b_data)
self.assertEqual(b_value, b.value)
self.assertEqual(b_data, b.data)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
# Diff in value causes change
other = _type(self.zone, 'a', {'ttl': 30, 'value': b_value})
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# __repr__ doesn't blow up
a.__repr__()
def test_dname(self):
self.assertSingleValue(DnameRecord, 'target.foo.com.', 'other.foo.com.')
def test_dname_lowering_value(self):
upper_record = DnameRecord(
self.zone,
'DnameUppwerValue',
{'ttl': 30, 'type': 'DNAME', 'value': 'GITHUB.COM'},
)
lower_record = DnameRecord(
self.zone,
'DnameLowerValue',
{'ttl': 30, 'type': 'DNAME', 'value': 'github.com'},
)
self.assertEqual(upper_record.value, lower_record.value)
def test_validation(self):
# A valid DNAME record.
Record.new(
self.zone,
'sub',
{'type': 'DNAME', 'ttl': 600, 'value': 'foo.bar.com.'},
)
# A DNAME record can be present at the zone APEX.
Record.new(
self.zone,
'',
{'type': 'DNAME', 'ttl': 600, 'value': 'foo.bar.com.'},
)
# not a valid FQDN
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone, 'www', {'type': 'DNAME', 'ttl': 600, 'value': '.'}
)
self.assertEqual(
['DNAME value "." is not a valid FQDN'], ctx.exception.reasons
)
# missing trailing .
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'www',
{'type': 'DNAME', 'ttl': 600, 'value': 'foo.bar.com'},
)
self.assertEqual(
['DNAME value "foo.bar.com" missing trailing .'],
ctx.exception.reasons,
)

+ 206
- 0
tests/test_octodns_record_ds.py View File

@ -0,0 +1,206 @@
#
#
#
from unittest import TestCase
from octodns.record.ds import DsRecord, DsValue
from octodns.record.rr import RrParseError
from octodns.zone import Zone
class TestRecordDs(TestCase):
def test_ds(self):
for a, b in (
# diff flags
(
{
'flags': 0,
'protocol': 1,
'algorithm': 2,
'public_key': 'abcdef0123456',
},
{
'flags': 1,
'protocol': 1,
'algorithm': 2,
'public_key': 'abcdef0123456',
},
),
# diff protocol
(
{
'flags': 0,
'protocol': 1,
'algorithm': 2,
'public_key': 'abcdef0123456',
},
{
'flags': 0,
'protocol': 2,
'algorithm': 2,
'public_key': 'abcdef0123456',
},
),
# diff algorithm
(
{
'flags': 0,
'protocol': 1,
'algorithm': 2,
'public_key': 'abcdef0123456',
},
{
'flags': 0,
'protocol': 1,
'algorithm': 3,
'public_key': 'abcdef0123456',
},
),
# diff public_key
(
{
'flags': 0,
'protocol': 1,
'algorithm': 2,
'public_key': 'abcdef0123456',
},
{
'flags': 0,
'protocol': 1,
'algorithm': 2,
'public_key': 'bcdef0123456a',
},
),
):
a = DsValue(a)
self.assertEqual(a, a)
b = DsValue(b)
self.assertEqual(b, b)
self.assertNotEqual(a, b)
self.assertNotEqual(b, a)
self.assertTrue(a < b)
# empty string won't parse
with self.assertRaises(RrParseError):
DsValue.parse_rdata_text('')
# single word won't parse
with self.assertRaises(RrParseError):
DsValue.parse_rdata_text('nope')
# 2nd word won't parse
with self.assertRaises(RrParseError):
DsValue.parse_rdata_text('0 1')
# 3rd word won't parse
with self.assertRaises(RrParseError):
DsValue.parse_rdata_text('0 1 2')
# 5th word won't parse
with self.assertRaises(RrParseError):
DsValue.parse_rdata_text('0 1 2 key blah')
# things ints, will parse
self.assertEqual(
{
'flags': 'one',
'protocol': 'two',
'algorithm': 'three',
'public_key': 'key',
},
DsValue.parse_rdata_text('one two three key'),
)
# valid
data = {
'flags': 0,
'protocol': 1,
'algorithm': 2,
'public_key': '99148c81',
}
self.assertEqual(data, DsValue.parse_rdata_text('0 1 2 99148c81'))
self.assertEqual([], DsValue.validate(data, 'DS'))
# missing flags
data = {'protocol': 1, 'algorithm': 2, 'public_key': '99148c81'}
self.assertEqual(['missing flags'], DsValue.validate(data, 'DS'))
# invalid flags
data = {
'flags': 'a',
'protocol': 1,
'algorithm': 2,
'public_key': '99148c81',
}
self.assertEqual(['invalid flags "a"'], DsValue.validate(data, 'DS'))
# missing protocol
data = {'flags': 1, 'algorithm': 2, 'public_key': '99148c81'}
self.assertEqual(['missing protocol'], DsValue.validate(data, 'DS'))
# invalid protocol
data = {
'flags': 1,
'protocol': 'a',
'algorithm': 2,
'public_key': '99148c81',
}
self.assertEqual(['invalid protocol "a"'], DsValue.validate(data, 'DS'))
# missing algorithm
data = {'flags': 1, 'protocol': 2, 'public_key': '99148c81'}
self.assertEqual(['missing algorithm'], DsValue.validate(data, 'DS'))
# invalid algorithm
data = {
'flags': 1,
'protocol': 2,
'algorithm': 'a',
'public_key': '99148c81',
}
self.assertEqual(
['invalid algorithm "a"'], DsValue.validate(data, 'DS')
)
# missing algorithm (list)
data = {'flags': 1, 'protocol': 2, 'algorithm': 3}
self.assertEqual(['missing public_key'], DsValue.validate([data], 'DS'))
zone = Zone('unit.tests.', [])
values = [
{
'flags': 0,
'protocol': 1,
'algorithm': 2,
'public_key': '99148c81',
},
{
'flags': 1,
'protocol': 2,
'algorithm': 3,
'public_key': '99148c44',
},
]
a = DsRecord(zone, 'ds', {'ttl': 32, 'values': values})
self.assertEqual(0, a.values[0].flags)
a.values[0].flags += 1
self.assertEqual(1, a.values[0].flags)
self.assertEqual(1, a.values[0].protocol)
a.values[0].protocol += 1
self.assertEqual(2, a.values[0].protocol)
self.assertEqual(2, a.values[0].algorithm)
a.values[0].algorithm += 1
self.assertEqual(3, a.values[0].algorithm)
self.assertEqual('99148c81', a.values[0].public_key)
a.values[0].public_key = '99148c42'
self.assertEqual('99148c42', a.values[0].public_key)
self.assertEqual(1, a.values[1].flags)
self.assertEqual(2, a.values[1].protocol)
self.assertEqual(3, a.values[1].algorithm)
self.assertEqual('99148c44', a.values[1].public_key)
self.assertEqual(DsValue(values[1]), a.values[1].data)
self.assertEqual('1 2 3 99148c44', a.values[1].rdata_text)
self.assertEqual('1 2 3 99148c44', a.values[1].__repr__())

+ 1284
- 0
tests/test_octodns_record_dynamic.py
File diff suppressed because it is too large
View File


+ 184
- 1
tests/test_octodns_record_geo.py View File

@ -4,10 +4,74 @@
from unittest import TestCase
from octodns.record.geo import GeoCodes
from octodns.record import Record
from octodns.record.a import ARecord
from octodns.record.geo import GeoCodes, GeoValue
from octodns.record.exception import ValidationError
from octodns.zone import Zone
from helpers import SimpleProvider, GeoProvider
class TestRecordGeo(TestCase):
zone = Zone('unit.tests.', [])
def test_geo(self):
geo_data = {
'ttl': 42,
'values': ['5.2.3.4', '6.2.3.4'],
'geo': {
'AF': ['1.1.1.1'],
'AS-JP': ['2.2.2.2', '3.3.3.3'],
'NA-US': ['4.4.4.4', '5.5.5.5'],
'NA-US-CA': ['6.6.6.6', '7.7.7.7'],
},
}
geo = ARecord(self.zone, 'geo', geo_data)
self.assertEqual(geo_data, geo.data)
other_data = {
'ttl': 42,
'values': ['5.2.3.4', '6.2.3.4'],
'geo': {
'AF': ['1.1.1.1'],
'AS-JP': ['2.2.2.2', '3.3.3.3'],
'NA-US': ['4.4.4.4', '5.5.5.5'],
'NA-US-CA': ['6.6.6.6', '7.7.7.7'],
},
}
other = ARecord(self.zone, 'geo', other_data)
self.assertEqual(other_data, other.data)
simple_target = SimpleProvider()
geo_target = GeoProvider()
# Geo provider doesn't consider identical geo to be changes
self.assertFalse(geo.changes(geo, geo_target))
# geo values don't impact equality
other.geo['AF'].values = ['9.9.9.9']
self.assertTrue(geo == other)
# Non-geo supporting provider doesn't consider geo diffs to be changes
self.assertFalse(geo.changes(other, simple_target))
# Geo provider does consider geo diffs to be changes
self.assertTrue(geo.changes(other, geo_target))
# Object without geo doesn't impact equality
other.geo = {}
self.assertTrue(geo == other)
# Non-geo supporting provider doesn't consider lack of geo a diff
self.assertFalse(geo.changes(other, simple_target))
# Geo provider does consider lack of geo diffs to be changes
self.assertTrue(geo.changes(other, geo_target))
# __repr__ doesn't blow up
geo.__repr__()
class TestRecordGeoCodes(TestCase):
zone = Zone('unit.tests.', [])
def test_validate(self):
prefix = 'xyz '
@ -104,3 +168,122 @@ class TestRecordGeoCodes(TestCase):
self.assertEqual('NA-CA-AB', GeoCodes.province_to_code('AB'))
self.assertEqual('NA-CA-BC', GeoCodes.province_to_code('BC'))
self.assertFalse(GeoCodes.province_to_code('XX'))
def test_geo_value(self):
code = 'NA-US-CA'
values = ['1.2.3.4']
geo = GeoValue(code, values)
self.assertEqual(code, geo.code)
self.assertEqual('NA', geo.continent_code)
self.assertEqual('US', geo.country_code)
self.assertEqual('CA', geo.subdivision_code)
self.assertEqual(values, geo.values)
self.assertEqual(['NA-US', 'NA'], list(geo.parents))
a = GeoValue('NA-US-CA', values)
b = GeoValue('AP-JP', values)
c = GeoValue('NA-US-CA', ['2.3.4.5'])
self.assertEqual(a, a)
self.assertEqual(b, b)
self.assertEqual(c, c)
self.assertNotEqual(a, b)
self.assertNotEqual(a, c)
self.assertNotEqual(b, a)
self.assertNotEqual(b, c)
self.assertNotEqual(c, a)
self.assertNotEqual(c, b)
self.assertTrue(a > b)
self.assertTrue(a < c)
self.assertTrue(b < a)
self.assertTrue(b < c)
self.assertTrue(c > a)
self.assertTrue(c > b)
self.assertTrue(a >= a)
self.assertTrue(a >= b)
self.assertTrue(a <= c)
self.assertTrue(b <= a)
self.assertTrue(b <= b)
self.assertTrue(b <= c)
self.assertTrue(c > a)
self.assertTrue(c > b)
self.assertTrue(c >= b)
def test_validation(self):
Record.new(
self.zone,
'',
{
'geo': {'NA': ['1.2.3.5'], 'NA-US': ['1.2.3.5', '1.2.3.6']},
'type': 'A',
'ttl': 600,
'value': '1.2.3.4',
},
)
# invalid ip address
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'geo': {'NA': ['hello'], 'NA-US': ['1.2.3.5', '1.2.3.6']},
'type': 'A',
'ttl': 600,
'value': '1.2.3.4',
},
)
self.assertEqual(
['invalid IPv4 address "hello"'], ctx.exception.reasons
)
# invalid geo code
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'geo': {'XYZ': ['1.2.3.4']},
'type': 'A',
'ttl': 600,
'value': '1.2.3.4',
},
)
self.assertEqual(['invalid geo "XYZ"'], ctx.exception.reasons)
# invalid ip address
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'geo': {'NA': ['hello'], 'NA-US': ['1.2.3.5', 'goodbye']},
'type': 'A',
'ttl': 600,
'value': '1.2.3.4',
},
)
self.assertEqual(
['invalid IPv4 address "hello"', 'invalid IPv4 address "goodbye"'],
ctx.exception.reasons,
)
# invalid healthcheck protocol
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'a',
{
'geo': {'NA': ['1.2.3.5'], 'NA-US': ['1.2.3.5', '1.2.3.6']},
'type': 'A',
'ttl': 600,
'value': '1.2.3.4',
'octodns': {'healthcheck': {'protocol': 'FTP'}},
},
)
self.assertEqual(
['invalid healthcheck protocol'], ctx.exception.reasons
)

+ 30
- 0
tests/test_octodns_record_ip.py View File

@ -0,0 +1,30 @@
#
#
#
from unittest import TestCase
from octodns.record.a import ARecord, Ipv4Value
from octodns.zone import Zone
class TestRecordIp(TestCase):
def test_ipv4_value_rdata_text(self):
# anything goes, we're a noop
for s in (
None,
'',
'word',
42,
42.43,
'1.2.3',
'some.words.that.here',
'1.2.word.4',
'1.2.3.4',
):
self.assertEqual(s, Ipv4Value.parse_rdata_text(s))
zone = Zone('unit.tests.', [])
a = ARecord(zone, 'a', {'ttl': 42, 'value': '1.2.3.4'})
self.assertEqual('1.2.3.4', a.values[0].rdata_text)

+ 697
- 0
tests/test_octodns_record_loc.py View File

@ -0,0 +1,697 @@
#
#
#
from unittest import TestCase
from octodns.record import Record
from octodns.record.loc import LocRecord, LocValue
from octodns.record.exception import ValidationError
from octodns.record.rr import RrParseError
from octodns.zone import Zone
from helpers import SimpleProvider
class TestRecordLoc(TestCase):
zone = Zone('unit.tests.', [])
def test_loc(self):
a_values = [
LocValue(
{
'lat_degrees': 31,
'lat_minutes': 58,
'lat_seconds': 52.1,
'lat_direction': 'S',
'long_degrees': 115,
'long_minutes': 49,
'long_seconds': 11.7,
'long_direction': 'E',
'altitude': 20,
'size': 10,
'precision_horz': 10,
'precision_vert': 2,
}
)
]
a_data = {'ttl': 30, 'values': a_values}
a = LocRecord(self.zone, 'a', a_data)
self.assertEqual('a', a.name)
self.assertEqual('a.unit.tests.', a.fqdn)
self.assertEqual(30, a.ttl)
self.assertEqual(a_values[0]['lat_degrees'], a.values[0].lat_degrees)
self.assertEqual(a_values[0]['lat_minutes'], a.values[0].lat_minutes)
self.assertEqual(a_values[0]['lat_seconds'], a.values[0].lat_seconds)
self.assertEqual(
a_values[0]['lat_direction'], a.values[0].lat_direction
)
self.assertEqual(a_values[0]['long_degrees'], a.values[0].long_degrees)
self.assertEqual(a_values[0]['long_minutes'], a.values[0].long_minutes)
self.assertEqual(a_values[0]['long_seconds'], a.values[0].long_seconds)
self.assertEqual(
a_values[0]['long_direction'], a.values[0].long_direction
)
self.assertEqual(a_values[0]['altitude'], a.values[0].altitude)
self.assertEqual(a_values[0]['size'], a.values[0].size)
self.assertEqual(
a_values[0]['precision_horz'], a.values[0].precision_horz
)
self.assertEqual(
a_values[0]['precision_vert'], a.values[0].precision_vert
)
b_value = LocValue(
{
'lat_degrees': 32,
'lat_minutes': 7,
'lat_seconds': 19,
'lat_direction': 'S',
'long_degrees': 116,
'long_minutes': 2,
'long_seconds': 25,
'long_direction': 'E',
'altitude': 10,
'size': 1,
'precision_horz': 10000,
'precision_vert': 10,
}
)
b_data = {'ttl': 30, 'value': b_value}
b = LocRecord(self.zone, 'b', b_data)
self.assertEqual(b_value['lat_degrees'], b.values[0].lat_degrees)
self.assertEqual(b_value['lat_minutes'], b.values[0].lat_minutes)
self.assertEqual(b_value['lat_seconds'], b.values[0].lat_seconds)
self.assertEqual(b_value['lat_direction'], b.values[0].lat_direction)
self.assertEqual(b_value['long_degrees'], b.values[0].long_degrees)
self.assertEqual(b_value['long_minutes'], b.values[0].long_minutes)
self.assertEqual(b_value['long_seconds'], b.values[0].long_seconds)
self.assertEqual(b_value['long_direction'], b.values[0].long_direction)
self.assertEqual(b_value['altitude'], b.values[0].altitude)
self.assertEqual(b_value['size'], b.values[0].size)
self.assertEqual(b_value['precision_horz'], b.values[0].precision_horz)
self.assertEqual(b_value['precision_vert'], b.values[0].precision_vert)
self.assertEqual(b_data, b.data)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
# Diff in lat_direction causes change
other = LocRecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
other.values[0].lat_direction = 'N'
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in altitude causes change
other.values[0].altitude = a.values[0].altitude
other.values[0].altitude = -10
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# __repr__ doesn't blow up
a.__repr__()
def test_loc_value_rdata_text(self):
# only the exact correct number of words is allowed
for i in tuple(range(0, 12)) + (13,):
s = ''.join(['word'] * i)
with self.assertRaises(RrParseError):
LocValue.parse_rdata_text(s)
# type conversions are best effort
self.assertEqual(
{
'altitude': 'six',
'lat_degrees': 'zero',
'lat_direction': 'S',
'lat_minutes': 'one',
'lat_seconds': 'two',
'long_degrees': 'three',
'long_direction': 'W',
'long_minutes': 'four',
'long_seconds': 'five',
'precision_horz': 'eight',
'precision_vert': 'nine',
'size': 'seven',
},
LocValue.parse_rdata_text(
'zero one two S three four five W six seven eight nine'
),
)
# valid
s = '0 1 2.2 N 3 4 5.5 E 6.6m 7.7m 8.8m 9.9m'
self.assertEqual(
{
'altitude': 6.6,
'lat_degrees': 0,
'lat_direction': 'N',
'lat_minutes': 1,
'lat_seconds': 2.2,
'long_degrees': 3,
'long_direction': 'E',
'long_minutes': 4,
'long_seconds': 5.5,
'precision_horz': 8.8,
'precision_vert': 9.9,
'size': 7.7,
},
LocValue.parse_rdata_text(s),
)
# make sure that the cstor is using parse_rdata_text
zone = Zone('unit.tests.', [])
a = LocRecord(
zone,
'mx',
{
'type': 'LOC',
'ttl': 42,
'value': {
'altitude': 6.6,
'lat_degrees': 0,
'lat_direction': 'N',
'lat_minutes': 1,
'lat_seconds': 2.2,
'long_degrees': 3,
'long_direction': 'E',
'long_minutes': 4,
'long_seconds': 5.5,
'precision_horz': 8.8,
'precision_vert': 9.9,
'size': 7.7,
},
},
)
self.assertEqual(0, a.values[0].lat_degrees)
self.assertEqual(1, a.values[0].lat_minutes)
self.assertEqual(2.2, a.values[0].lat_seconds)
self.assertEqual('N', a.values[0].lat_direction)
self.assertEqual(3, a.values[0].long_degrees)
self.assertEqual(4, a.values[0].long_minutes)
self.assertEqual(5.5, a.values[0].long_seconds)
self.assertEqual('E', a.values[0].long_direction)
self.assertEqual(6.6, a.values[0].altitude)
self.assertEqual(7.7, a.values[0].size)
self.assertEqual(8.8, a.values[0].precision_horz)
self.assertEqual(9.9, a.values[0].precision_vert)
self.assertEqual(s, a.values[0].rdata_text)
def test_loc_value(self):
a = LocValue(
{
'lat_degrees': 31,
'lat_minutes': 58,
'lat_seconds': 52.1,
'lat_direction': 'S',
'long_degrees': 115,
'long_minutes': 49,
'long_seconds': 11.7,
'long_direction': 'E',
'altitude': 20,
'size': 10,
'precision_horz': 10,
'precision_vert': 2,
}
)
b = LocValue(
{
'lat_degrees': 32,
'lat_minutes': 7,
'lat_seconds': 19,
'lat_direction': 'S',
'long_degrees': 116,
'long_minutes': 2,
'long_seconds': 25,
'long_direction': 'E',
'altitude': 10,
'size': 1,
'precision_horz': 10000,
'precision_vert': 10,
}
)
c = LocValue(
{
'lat_degrees': 53,
'lat_minutes': 14,
'lat_seconds': 10,
'lat_direction': 'N',
'long_degrees': 2,
'long_minutes': 18,
'long_seconds': 26,
'long_direction': 'W',
'altitude': 10,
'size': 1,
'precision_horz': 1000,
'precision_vert': 10,
}
)
self.assertEqual(a, a)
self.assertEqual(b, b)
self.assertEqual(c, c)
self.assertNotEqual(a, b)
self.assertNotEqual(a, c)
self.assertNotEqual(b, a)
self.assertNotEqual(b, c)
self.assertNotEqual(c, a)
self.assertNotEqual(c, b)
self.assertTrue(a < b)
self.assertTrue(a < c)
self.assertTrue(b > a)
self.assertTrue(b < c)
self.assertTrue(c > a)
self.assertTrue(c > b)
self.assertTrue(a <= b)
self.assertTrue(a <= c)
self.assertTrue(a <= a)
self.assertTrue(a >= a)
self.assertTrue(b >= a)
self.assertTrue(b <= c)
self.assertTrue(b >= b)
self.assertTrue(b <= b)
self.assertTrue(c >= a)
self.assertTrue(c >= b)
self.assertTrue(c >= c)
self.assertTrue(c <= c)
self.assertEqual(31, a.lat_degrees)
a.lat_degrees = a.lat_degrees + 1
self.assertEqual(32, a.lat_degrees)
self.assertEqual(58, a.lat_minutes)
a.lat_minutes = a.lat_minutes + 1
self.assertEqual(59, a.lat_minutes)
self.assertEqual(52.1, a.lat_seconds)
a.lat_seconds = a.lat_seconds + 1
self.assertEqual(53.1, a.lat_seconds)
self.assertEqual('S', a.lat_direction)
a.lat_direction = 'N'
self.assertEqual('N', a.lat_direction)
self.assertEqual(115, a.long_degrees)
a.long_degrees = a.long_degrees + 1
self.assertEqual(116, a.long_degrees)
self.assertEqual(49, a.long_minutes)
a.long_minutes = a.long_minutes + 1
self.assertEqual(50, a.long_minutes)
self.assertEqual(11.7, a.long_seconds)
a.long_seconds = a.long_seconds + 1
self.assertEqual(12.7, a.long_seconds)
self.assertEqual('E', a.long_direction)
a.long_direction = 'W'
self.assertEqual('W', a.long_direction)
self.assertEqual(20, a.altitude)
a.altitude = a.altitude + 1
self.assertEqual(21, a.altitude)
self.assertEqual(10, a.size)
a.size = a.size + 1
self.assertEqual(11, a.size)
self.assertEqual(10, a.precision_horz)
a.precision_horz = a.precision_horz + 1
self.assertEqual(11, a.precision_horz)
self.assertEqual(2, a.precision_vert)
a.precision_vert = a.precision_vert + 1
self.assertEqual(3, a.precision_vert)
# Hash
values = set()
values.add(a)
self.assertTrue(a in values)
self.assertFalse(b in values)
values.add(b)
self.assertTrue(b in values)
def test_validation(self):
# doesn't blow up
Record.new(
self.zone,
'',
{
'type': 'LOC',
'ttl': 600,
'value': {
'lat_degrees': 31,
'lat_minutes': 58,
'lat_seconds': 52.1,
'lat_direction': 'S',
'long_degrees': 115,
'long_minutes': 49,
'long_seconds': 11.7,
'long_direction': 'E',
'altitude': 20,
'size': 10,
'precision_horz': 10,
'precision_vert': 2,
},
},
)
# missing int key
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'LOC',
'ttl': 600,
'value': {
'lat_minutes': 58,
'lat_seconds': 52.1,
'lat_direction': 'S',
'long_degrees': 115,
'long_minutes': 49,
'long_seconds': 11.7,
'long_direction': 'E',
'altitude': 20,
'size': 10,
'precision_horz': 10,
'precision_vert': 2,
},
},
)
self.assertEqual(['missing lat_degrees'], ctx.exception.reasons)
# missing float key
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'LOC',
'ttl': 600,
'value': {
'lat_degrees': 31,
'lat_minutes': 58,
'lat_direction': 'S',
'long_degrees': 115,
'long_minutes': 49,
'long_seconds': 11.7,
'long_direction': 'E',
'altitude': 20,
'size': 10,
'precision_horz': 10,
'precision_vert': 2,
},
},
)
self.assertEqual(['missing lat_seconds'], ctx.exception.reasons)
# missing text key
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'LOC',
'ttl': 600,
'value': {
'lat_degrees': 31,
'lat_minutes': 58,
'lat_seconds': 52.1,
'long_degrees': 115,
'long_minutes': 49,
'long_seconds': 11.7,
'long_direction': 'E',
'altitude': 20,
'size': 10,
'precision_horz': 10,
'precision_vert': 2,
},
},
)
self.assertEqual(['missing lat_direction'], ctx.exception.reasons)
# invalid direction
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'LOC',
'ttl': 600,
'value': {
'lat_degrees': 31,
'lat_minutes': 58,
'lat_seconds': 52.1,
'lat_direction': 'U',
'long_degrees': 115,
'long_minutes': 49,
'long_seconds': 11.7,
'long_direction': 'E',
'altitude': 20,
'size': 10,
'precision_horz': 10,
'precision_vert': 2,
},
},
)
self.assertEqual(
['invalid direction for lat_direction "U"'], ctx.exception.reasons
)
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'LOC',
'ttl': 600,
'value': {
'lat_degrees': 31,
'lat_minutes': 58,
'lat_seconds': 52.1,
'lat_direction': 'S',
'long_degrees': 115,
'long_minutes': 49,
'long_seconds': 11.7,
'long_direction': 'N',
'altitude': 20,
'size': 10,
'precision_horz': 10,
'precision_vert': 2,
},
},
)
self.assertEqual(
['invalid direction for long_direction "N"'], ctx.exception.reasons
)
# invalid degrees
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'LOC',
'ttl': 600,
'value': {
'lat_degrees': 360,
'lat_minutes': 58,
'lat_seconds': 52.1,
'lat_direction': 'S',
'long_degrees': 115,
'long_minutes': 49,
'long_seconds': 11.7,
'long_direction': 'E',
'altitude': 20,
'size': 10,
'precision_horz': 10,
'precision_vert': 2,
},
},
)
self.assertEqual(
['invalid value for lat_degrees "360"'], ctx.exception.reasons
)
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'LOC',
'ttl': 600,
'value': {
'lat_degrees': 'nope',
'lat_minutes': 58,
'lat_seconds': 52.1,
'lat_direction': 'S',
'long_degrees': 115,
'long_minutes': 49,
'long_seconds': 11.7,
'long_direction': 'E',
'altitude': 20,
'size': 10,
'precision_horz': 10,
'precision_vert': 2,
},
},
)
self.assertEqual(['invalid lat_degrees "nope"'], ctx.exception.reasons)
# invalid minutes
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'LOC',
'ttl': 600,
'value': {
'lat_degrees': 31,
'lat_minutes': 60,
'lat_seconds': 52.1,
'lat_direction': 'S',
'long_degrees': 115,
'long_minutes': 49,
'long_seconds': 11.7,
'long_direction': 'E',
'altitude': 20,
'size': 10,
'precision_horz': 10,
'precision_vert': 2,
},
},
)
self.assertEqual(
['invalid value for lat_minutes "60"'], ctx.exception.reasons
)
# invalid seconds
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'LOC',
'ttl': 600,
'value': {
'lat_degrees': 31,
'lat_minutes': 58,
'lat_seconds': 60,
'lat_direction': 'S',
'long_degrees': 115,
'long_minutes': 49,
'long_seconds': 11.7,
'long_direction': 'E',
'altitude': 20,
'size': 10,
'precision_horz': 10,
'precision_vert': 2,
},
},
)
self.assertEqual(
['invalid value for lat_seconds "60"'], ctx.exception.reasons
)
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'LOC',
'ttl': 600,
'value': {
'lat_degrees': 31,
'lat_minutes': 58,
'lat_seconds': 'nope',
'lat_direction': 'S',
'long_degrees': 115,
'long_minutes': 49,
'long_seconds': 11.7,
'long_direction': 'E',
'altitude': 20,
'size': 10,
'precision_horz': 10,
'precision_vert': 2,
},
},
)
self.assertEqual(['invalid lat_seconds "nope"'], ctx.exception.reasons)
# invalid altitude
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'LOC',
'ttl': 600,
'value': {
'lat_degrees': 31,
'lat_minutes': 58,
'lat_seconds': 52.1,
'lat_direction': 'S',
'long_degrees': 115,
'long_minutes': 49,
'long_seconds': 11.7,
'long_direction': 'E',
'altitude': -666666,
'size': 10,
'precision_horz': 10,
'precision_vert': 2,
},
},
)
self.assertEqual(
['invalid value for altitude "-666666"'], ctx.exception.reasons
)
# invalid size
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'LOC',
'ttl': 600,
'value': {
'lat_degrees': 31,
'lat_minutes': 58,
'lat_seconds': 52.1,
'lat_direction': 'S',
'long_degrees': 115,
'long_minutes': 49,
'long_seconds': 11.7,
'long_direction': 'E',
'altitude': 20,
'size': 99999999.99,
'precision_horz': 10,
'precision_vert': 2,
},
},
)
self.assertEqual(
['invalid value for size "99999999.99"'], ctx.exception.reasons
)

+ 265
- 0
tests/test_octodns_record_mx.py View File

@ -0,0 +1,265 @@
#
#
#
from unittest import TestCase
from octodns.record import Record
from octodns.record.mx import MxRecord, MxValue
from octodns.record.exception import ValidationError
from octodns.record.rr import RrParseError
from octodns.zone import Zone
from helpers import SimpleProvider
class TestRecordMx(TestCase):
zone = Zone('unit.tests.', [])
def test_mx(self):
a_values = [
MxValue({'preference': 10, 'exchange': 'smtp1.'}),
MxValue({'priority': 20, 'value': 'smtp2.'}),
]
a_data = {'ttl': 30, 'values': a_values}
a = MxRecord(self.zone, 'a', a_data)
self.assertEqual('a', a.name)
self.assertEqual('a.unit.tests.', a.fqdn)
self.assertEqual(30, a.ttl)
self.assertEqual(a_values[0]['preference'], a.values[0].preference)
self.assertEqual(a_values[0]['exchange'], a.values[0].exchange)
self.assertEqual(a_values[1]['preference'], a.values[1].preference)
self.assertEqual(a_values[1]['exchange'], a.values[1].exchange)
a_data['values'][1] = MxValue({'preference': 20, 'exchange': 'smtp2.'})
self.assertEqual(a_data, a.data)
b_value = MxValue({'preference': 0, 'exchange': 'smtp3.'})
b_data = {'ttl': 30, 'value': b_value}
b = MxRecord(self.zone, 'b', b_data)
self.assertEqual(b_value['preference'], b.values[0].preference)
self.assertEqual(b_value['exchange'], b.values[0].exchange)
self.assertEqual(b_data, b.data)
a_upper_values = [
{'preference': 10, 'exchange': 'SMTP1.'},
{'priority': 20, 'value': 'SMTP2.'},
]
a_upper_data = {'ttl': 30, 'values': a_upper_values}
a_upper = MxRecord(self.zone, 'a', a_upper_data)
self.assertEqual(a_upper.data, a.data)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
# Diff in preference causes change
other = MxRecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
other.values[0].preference = 22
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in value causes change
other.values[0].preference = a.values[0].preference
other.values[0].exchange = 'smtpX'
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# __repr__ doesn't blow up
a.__repr__()
def test_mx_value_rdata_text(self):
# empty string won't parse
with self.assertRaises(RrParseError):
MxValue.parse_rdata_text('')
# single word won't parse
with self.assertRaises(RrParseError):
MxValue.parse_rdata_text('nope')
# 3rd word won't parse
with self.assertRaises(RrParseError):
MxValue.parse_rdata_text('10 mx.unit.tests. another')
# preference not an int
self.assertEqual(
{'preference': 'abc', 'exchange': 'mx.unit.tests.'},
MxValue.parse_rdata_text('abc mx.unit.tests.'),
)
# valid
self.assertEqual(
{'preference': 10, 'exchange': 'mx.unit.tests.'},
MxValue.parse_rdata_text('10 mx.unit.tests.'),
)
zone = Zone('unit.tests.', [])
a = MxRecord(
zone,
'mx',
{
'ttl': 32,
'values': [
{'preference': 11, 'exchange': 'mail1.unit.tests.'},
{'preference': 12, 'exchange': 'mail2.unit.tests.'},
],
},
)
self.assertEqual(11, a.values[0].preference)
self.assertEqual('mail1.unit.tests.', a.values[0].exchange)
self.assertEqual('11 mail1.unit.tests.', a.values[0].rdata_text)
self.assertEqual(12, a.values[1].preference)
self.assertEqual('mail2.unit.tests.', a.values[1].exchange)
self.assertEqual('12 mail2.unit.tests.', a.values[1].rdata_text)
def test_mx_value(self):
a = MxValue(
{'preference': 0, 'priority': 'a', 'exchange': 'v', 'value': '1'}
)
b = MxValue(
{'preference': 10, 'priority': 'a', 'exchange': 'v', 'value': '2'}
)
c = MxValue(
{'preference': 0, 'priority': 'b', 'exchange': 'z', 'value': '3'}
)
self.assertEqual(a, a)
self.assertEqual(b, b)
self.assertEqual(c, c)
self.assertNotEqual(a, b)
self.assertNotEqual(a, c)
self.assertNotEqual(b, a)
self.assertNotEqual(b, c)
self.assertNotEqual(c, a)
self.assertNotEqual(c, b)
self.assertTrue(a < b)
self.assertTrue(a < c)
self.assertTrue(b > a)
self.assertTrue(b > c)
self.assertTrue(c > a)
self.assertTrue(c < b)
self.assertTrue(a <= b)
self.assertTrue(a <= c)
self.assertTrue(a <= a)
self.assertTrue(a >= a)
self.assertTrue(b >= a)
self.assertTrue(b >= c)
self.assertTrue(b >= b)
self.assertTrue(b <= b)
self.assertTrue(c >= a)
self.assertTrue(c <= b)
self.assertTrue(c >= c)
self.assertTrue(c <= c)
self.assertEqual(a.__hash__(), a.__hash__())
self.assertNotEqual(a.__hash__(), b.__hash__())
def test_validation(self):
# doesn't blow up
Record.new(
self.zone,
'',
{
'type': 'MX',
'ttl': 600,
'value': {'preference': 10, 'exchange': 'foo.bar.com.'},
},
)
# missing preference
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'MX',
'ttl': 600,
'value': {'exchange': 'foo.bar.com.'},
},
)
self.assertEqual(['missing preference'], ctx.exception.reasons)
# invalid preference
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'MX',
'ttl': 600,
'value': {'preference': 'nope', 'exchange': 'foo.bar.com.'},
},
)
self.assertEqual(['invalid preference "nope"'], ctx.exception.reasons)
# missing exchange
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{'type': 'MX', 'ttl': 600, 'value': {'preference': 10}},
)
self.assertEqual(['missing exchange'], ctx.exception.reasons)
# missing trailing .
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'MX',
'ttl': 600,
'value': {'preference': 10, 'exchange': 'foo.bar.com'},
},
)
self.assertEqual(
['MX value "foo.bar.com" missing trailing .'], ctx.exception.reasons
)
# exchange must be a valid FQDN
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'MX',
'ttl': 600,
'value': {'preference': 10, 'exchange': '100 foo.bar.com.'},
},
)
self.assertEqual(
['Invalid MX exchange "100 foo.bar.com." is not a valid FQDN.'],
ctx.exception.reasons,
)
# if exchange doesn't exist value can not be None/falsey
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'MX',
'ttl': 600,
'value': {'preference': 10, 'value': ''},
},
)
self.assertEqual(['missing exchange'], ctx.exception.reasons)
# exchange can be a single `.`
record = Record.new(
self.zone,
'',
{
'type': 'MX',
'ttl': 600,
'value': {'preference': 0, 'exchange': '.'},
},
)
self.assertEqual('.', record.values[0].exchange)

+ 438
- 0
tests/test_octodns_record_naptr.py View File

@ -0,0 +1,438 @@
#
#
#
from unittest import TestCase
from octodns.record import Record
from octodns.record.naptr import NaptrRecord, NaptrValue
from octodns.record.exception import ValidationError
from octodns.record.rr import RrParseError
from octodns.zone import Zone
from helpers import SimpleProvider
class TestRecordNaptr(TestCase):
zone = Zone('unit.tests.', [])
def test_naptr(self):
a_values = [
NaptrValue(
{
'order': 10,
'preference': 11,
'flags': 'X',
'service': 'Y',
'regexp': 'Z',
'replacement': '.',
}
),
NaptrValue(
{
'order': 20,
'preference': 21,
'flags': 'A',
'service': 'B',
'regexp': 'C',
'replacement': 'foo.com',
}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = NaptrRecord(self.zone, 'a', a_data)
self.assertEqual('a', a.name)
self.assertEqual('a.unit.tests.', a.fqdn)
self.assertEqual(30, a.ttl)
for i in (0, 1):
for k in a_values[0].keys():
self.assertEqual(a_values[i][k], getattr(a.values[i], k))
self.assertEqual(a_data, a.data)
b_value = NaptrValue(
{
'order': 30,
'preference': 31,
'flags': 'M',
'service': 'N',
'regexp': 'O',
'replacement': 'x',
}
)
b_data = {'ttl': 30, 'value': b_value}
b = NaptrRecord(self.zone, 'b', b_data)
for k in a_values[0].keys():
self.assertEqual(b_value[k], getattr(b.values[0], k))
self.assertEqual(b_data, b.data)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
# Diff in priority causes change
other = NaptrRecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
other.values[0].order = 22
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in replacement causes change
other.values[0].order = a.values[0].order
other.values[0].replacement = 'smtpX'
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# full sorting
# equivalent
b_naptr_value = b.values[0]
self.assertTrue(b_naptr_value == b_naptr_value)
self.assertFalse(b_naptr_value != b_naptr_value)
self.assertTrue(b_naptr_value <= b_naptr_value)
self.assertTrue(b_naptr_value >= b_naptr_value)
# by order
self.assertTrue(
b_naptr_value
> NaptrValue(
{
'order': 10,
'preference': 31,
'flags': 'M',
'service': 'N',
'regexp': 'O',
'replacement': 'x',
}
)
)
self.assertTrue(
b_naptr_value
< NaptrValue(
{
'order': 40,
'preference': 31,
'flags': 'M',
'service': 'N',
'regexp': 'O',
'replacement': 'x',
}
)
)
# by preference
self.assertTrue(
b_naptr_value
> NaptrValue(
{
'order': 30,
'preference': 10,
'flags': 'M',
'service': 'N',
'regexp': 'O',
'replacement': 'x',
}
)
)
self.assertTrue(
b_naptr_value
< NaptrValue(
{
'order': 30,
'preference': 40,
'flags': 'M',
'service': 'N',
'regexp': 'O',
'replacement': 'x',
}
)
)
# by flags
self.assertTrue(
b_naptr_value
> NaptrValue(
{
'order': 30,
'preference': 31,
'flags': 'A',
'service': 'N',
'regexp': 'O',
'replacement': 'x',
}
)
)
self.assertTrue(
b_naptr_value
< NaptrValue(
{
'order': 30,
'preference': 31,
'flags': 'Z',
'service': 'N',
'regexp': 'O',
'replacement': 'x',
}
)
)
# by service
self.assertTrue(
b_naptr_value
> NaptrValue(
{
'order': 30,
'preference': 31,
'flags': 'M',
'service': 'A',
'regexp': 'O',
'replacement': 'x',
}
)
)
self.assertTrue(
b_naptr_value
< NaptrValue(
{
'order': 30,
'preference': 31,
'flags': 'M',
'service': 'Z',
'regexp': 'O',
'replacement': 'x',
}
)
)
# by regexp
self.assertTrue(
b_naptr_value
> NaptrValue(
{
'order': 30,
'preference': 31,
'flags': 'M',
'service': 'N',
'regexp': 'A',
'replacement': 'x',
}
)
)
self.assertTrue(
b_naptr_value
< NaptrValue(
{
'order': 30,
'preference': 31,
'flags': 'M',
'service': 'N',
'regexp': 'Z',
'replacement': 'x',
}
)
)
# by replacement
self.assertTrue(
b_naptr_value
> NaptrValue(
{
'order': 30,
'preference': 31,
'flags': 'M',
'service': 'N',
'regexp': 'O',
'replacement': 'a',
}
)
)
self.assertTrue(
b_naptr_value
< NaptrValue(
{
'order': 30,
'preference': 31,
'flags': 'M',
'service': 'N',
'regexp': 'O',
'replacement': 'z',
}
)
)
# __repr__ doesn't blow up
a.__repr__()
# Hash
v = NaptrValue(
{
'order': 30,
'preference': 31,
'flags': 'M',
'service': 'N',
'regexp': 'O',
'replacement': 'z',
}
)
o = NaptrValue(
{
'order': 30,
'preference': 32,
'flags': 'M',
'service': 'N',
'regexp': 'O',
'replacement': 'z',
}
)
values = set()
values.add(v)
self.assertTrue(v in values)
self.assertFalse(o in values)
values.add(o)
self.assertTrue(o in values)
self.assertEqual(30, o.order)
o.order = o.order + 1
self.assertEqual(31, o.order)
self.assertEqual(32, o.preference)
o.preference = o.preference + 1
self.assertEqual(33, o.preference)
self.assertEqual('M', o.flags)
o.flags = 'P'
self.assertEqual('P', o.flags)
self.assertEqual('N', o.service)
o.service = 'Q'
self.assertEqual('Q', o.service)
self.assertEqual('O', o.regexp)
o.regexp = 'R'
self.assertEqual('R', o.regexp)
self.assertEqual('z', o.replacement)
o.replacement = '1'
self.assertEqual('1', o.replacement)
def test_naptr_value_rdata_text(self):
# things with the wrong number of words won't parse
for v in (
'',
'one',
'one two',
'one two three',
'one two three four',
'one two three four five',
'one two three four five six seven',
):
with self.assertRaises(RrParseError):
NaptrValue.parse_rdata_text(v)
# we don't care if the types of things are correct when parsing rr text
self.assertEqual(
{
'order': 'one',
'preference': 'two',
'flags': 'three',
'service': 'four',
'regexp': 'five',
'replacement': 'six',
},
NaptrValue.parse_rdata_text('one two three four five six'),
)
# order and preference will be converted to int's when possible
self.assertEqual(
{
'order': 1,
'preference': 2,
'flags': 'three',
'service': 'four',
'regexp': 'five',
'replacement': 'six',
},
NaptrValue.parse_rdata_text('1 2 three four five six'),
)
# make sure that the cstor is using parse_rdata_text
zone = Zone('unit.tests.', [])
a = NaptrRecord(
zone,
'naptr',
{
'ttl': 32,
'value': {
'order': 1,
'preference': 2,
'flags': 'S',
'service': 'service',
'regexp': 'regexp',
'replacement': 'replacement',
},
},
)
self.assertEqual(1, a.values[0].order)
self.assertEqual(2, a.values[0].preference)
self.assertEqual('S', a.values[0].flags)
self.assertEqual('service', a.values[0].service)
self.assertEqual('regexp', a.values[0].regexp)
self.assertEqual('replacement', a.values[0].replacement)
s = '1 2 S service regexp replacement'
self.assertEqual(s, a.values[0].rdata_text)
def test_validation(self):
# doesn't blow up
Record.new(
self.zone,
'',
{
'type': 'NAPTR',
'ttl': 600,
'value': {
'order': 10,
'preference': 20,
'flags': 'S',
'service': 'srv',
'regexp': '.*',
'replacement': '.',
},
},
)
# missing X priority
value = {
'order': 10,
'preference': 20,
'flags': 'S',
'service': 'srv',
'regexp': '.*',
'replacement': '.',
}
for k in (
'order',
'preference',
'flags',
'service',
'regexp',
'replacement',
):
v = dict(value)
del v[k]
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone, '', {'type': 'NAPTR', 'ttl': 600, 'value': v}
)
self.assertEqual([f'missing {k}'], ctx.exception.reasons)
# non-int order
v = dict(value)
v['order'] = 'boo'
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {'type': 'NAPTR', 'ttl': 600, 'value': v})
self.assertEqual(['invalid order "boo"'], ctx.exception.reasons)
# non-int preference
v = dict(value)
v['preference'] = 'who'
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {'type': 'NAPTR', 'ttl': 600, 'value': v})
self.assertEqual(['invalid preference "who"'], ctx.exception.reasons)
# unrecognized flags
v = dict(value)
v['flags'] = 'X'
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {'type': 'NAPTR', 'ttl': 600, 'value': v})
self.assertEqual(['unrecognized flags "X"'], ctx.exception.reasons)

+ 83
- 0
tests/test_octodns_record_ns.py View File

@ -0,0 +1,83 @@
#
#
#
from unittest import TestCase
from octodns.record import Record
from octodns.record.ns import NsRecord, NsValue
from octodns.record.exception import ValidationError
from octodns.zone import Zone
class TestRecordNs(TestCase):
zone = Zone('unit.tests.', [])
def test_ns(self):
a_values = ['5.6.7.8.', '6.7.8.9.', '7.8.9.0.']
a_data = {'ttl': 30, 'values': a_values}
a = NsRecord(self.zone, 'a', a_data)
self.assertEqual('a', a.name)
self.assertEqual('a.unit.tests.', a.fqdn)
self.assertEqual(30, a.ttl)
self.assertEqual(a_values, a.values)
self.assertEqual(a_data, a.data)
b_value = '9.8.7.6.'
b_data = {'ttl': 30, 'value': b_value}
b = NsRecord(self.zone, 'b', b_data)
self.assertEqual([b_value], b.values)
self.assertEqual(b_data, b.data)
def test_ns_value_rdata_text(self):
# anything goes, we're a noop
for s in (
None,
'',
'word',
42,
42.43,
'1.2.3',
'some.words.that.here',
'1.2.word.4',
'1.2.3.4',
):
self.assertEqual(s, NsValue.parse_rdata_text(s))
zone = Zone('unit.tests.', [])
a = NsRecord(zone, 'a', {'ttl': 42, 'value': 'some.target.'})
self.assertEqual('some.target.', a.values[0].rdata_text)
def test_validation(self):
# doesn't blow up
Record.new(
self.zone,
'',
{'type': 'NS', 'ttl': 600, 'values': ['foo.bar.com.', '1.2.3.4.']},
)
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {'type': 'NS', 'ttl': 600})
self.assertEqual(['missing value(s)'], ctx.exception.reasons)
# no trailing .
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone, '', {'type': 'NS', 'ttl': 600, 'value': 'foo.bar'}
)
self.assertEqual(
['NS value "foo.bar" missing trailing .'], ctx.exception.reasons
)
# exchange must be a valid FQDN
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{'type': 'NS', 'ttl': 600, 'value': '100 foo.bar.com.'},
)
self.assertEqual(
['Invalid NS value "100 foo.bar.com." is not a valid FQDN.'],
ctx.exception.reasons,
)

+ 89
- 0
tests/test_octodns_record_ptr.py View File

@ -0,0 +1,89 @@
#
#
#
from unittest import TestCase
from octodns.record import Record
from octodns.record.ptr import PtrRecord, PtrValue
from octodns.record.exception import ValidationError
from octodns.zone import Zone
class TestRecordPtr(TestCase):
zone = Zone('unit.tests.', [])
def test_ptr_lowering_value(self):
upper_record = PtrRecord(
self.zone,
'PtrUppwerValue',
{'ttl': 30, 'type': 'PTR', 'value': 'GITHUB.COM.'},
)
lower_record = PtrRecord(
self.zone,
'PtrLowerValue',
{'ttl': 30, 'type': 'PTR', 'value': 'github.com.'},
)
self.assertEqual(upper_record.value, lower_record.value)
def test_ptr(self):
# doesn't blow up (name & zone here don't make any sense, but not
# important)
Record.new(
self.zone, '', {'type': 'PTR', 'ttl': 600, 'value': 'foo.bar.com.'}
)
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {'type': 'PTR', 'ttl': 600})
self.assertEqual(['missing value(s)'], ctx.exception.reasons)
# empty value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {'type': 'PTR', 'ttl': 600, 'value': ''})
self.assertEqual(['missing value(s)'], ctx.exception.reasons)
# not a valid FQDN
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone, '', {'type': 'PTR', 'ttl': 600, 'value': '_.'}
)
self.assertEqual(
['Invalid PTR value "_." is not a valid FQDN.'],
ctx.exception.reasons,
)
# no trailing .
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone, '', {'type': 'PTR', 'ttl': 600, 'value': 'foo.bar'}
)
self.assertEqual(
['PTR value "foo.bar" missing trailing .'], ctx.exception.reasons
)
def test_ptr_rdata_text(self):
# anything goes, we're a noop
for s in (
None,
'',
'word',
42,
42.43,
'1.2.3',
'some.words.that.here',
'1.2.word.4',
'1.2.3.4',
):
self.assertEqual(s, PtrValue.parse_rdata_text(s))
zone = Zone('unit.tests.', [])
a = PtrRecord(zone, 'a', {'ttl': 42, 'value': 'some.target.'})
self.assertEqual('some.target.', a.values[0].rdata_text)
a = PtrRecord(
zone, 'a', {'ttl': 42, 'values': ['some.target.', 'second.target.']}
)
self.assertEqual('second.target.', a.values[0].rdata_text)
self.assertEqual('some.target.', a.values[1].rdata_text)

+ 71
- 0
tests/test_octodns_record_spf.py View File

@ -0,0 +1,71 @@
#
#
#
from unittest import TestCase
from octodns.record import Record
from octodns.record.spf import SpfRecord
from octodns.record.exception import ValidationError
from octodns.zone import Zone
class TestRecordSpf(TestCase):
zone = Zone('unit.tests.', [])
def assertMultipleValues(self, _type, a_values, b_value):
a_data = {'ttl': 30, 'values': a_values}
a = _type(self.zone, 'a', a_data)
self.assertEqual('a', a.name)
self.assertEqual('a.unit.tests.', a.fqdn)
self.assertEqual(30, a.ttl)
self.assertEqual(a_values, a.values)
self.assertEqual(a_data, a.data)
b_data = {'ttl': 30, 'value': b_value}
b = _type(self.zone, 'b', b_data)
self.assertEqual([b_value], b.values)
self.assertEqual(b_data, b.data)
def test_spf(self):
a_values = ['spf1 -all', 'spf1 -hrm']
b_value = 'spf1 -other'
self.assertMultipleValues(SpfRecord, a_values, b_value)
def test_validation(self):
# doesn't blow up (name & zone here don't make any sense, but not
# important)
Record.new(
self.zone,
'',
{
'type': 'SPF',
'ttl': 600,
'values': [
'v=spf1 ip4:192.168.0.1/16-all',
'v=spf1 ip4:10.1.2.1/24-all',
'this has some\\; semi-colons\\; in it',
],
},
)
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {'type': 'SPF', 'ttl': 600})
self.assertEqual(['missing value(s)'], ctx.exception.reasons)
# missing escapes
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'SPF',
'ttl': 600,
'value': 'this has some; semi-colons\\; in it',
},
)
self.assertEqual(
['unescaped ; in "this has some; semi-colons\\; in it"'],
ctx.exception.reasons,
)

+ 432
- 0
tests/test_octodns_record_srv.py View File

@ -0,0 +1,432 @@
#
#
#
from unittest import TestCase
from octodns.record import Record
from octodns.record.srv import SrvRecord, SrvValue
from octodns.record.exception import ValidationError
from octodns.record.rr import RrParseError
from octodns.zone import Zone
from helpers import SimpleProvider
class TestRecordSrv(TestCase):
zone = Zone('unit.tests.', [])
def test_srv(self):
a_values = [
SrvValue(
{'priority': 10, 'weight': 11, 'port': 12, 'target': 'server1'}
),
SrvValue(
{'priority': 20, 'weight': 21, 'port': 22, 'target': 'server2'}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = SrvRecord(self.zone, '_a._tcp', a_data)
self.assertEqual('_a._tcp', a.name)
self.assertEqual('_a._tcp.unit.tests.', a.fqdn)
self.assertEqual(30, a.ttl)
self.assertEqual(a_values[0]['priority'], a.values[0].priority)
self.assertEqual(a_values[0]['weight'], a.values[0].weight)
self.assertEqual(a_values[0]['port'], a.values[0].port)
self.assertEqual(a_values[0]['target'], a.values[0].target)
self.assertEqual(a_data, a.data)
b_value = SrvValue(
{'priority': 30, 'weight': 31, 'port': 32, 'target': 'server3'}
)
b_data = {'ttl': 30, 'value': b_value}
b = SrvRecord(self.zone, '_b._tcp', b_data)
self.assertEqual(b_value['priority'], b.values[0].priority)
self.assertEqual(b_value['weight'], b.values[0].weight)
self.assertEqual(b_value['port'], b.values[0].port)
self.assertEqual(b_value['target'], b.values[0].target)
self.assertEqual(b_data, b.data)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
# Diff in priority causes change
other = SrvRecord(
self.zone, '_a._icmp', {'ttl': 30, 'values': a_values}
)
other.values[0].priority = 22
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in weight causes change
other.values[0].priority = a.values[0].priority
other.values[0].weight = 33
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in port causes change
other.values[0].weight = a.values[0].weight
other.values[0].port = 44
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in target causes change
other.values[0].port = a.values[0].port
other.values[0].target = 'serverX'
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# __repr__ doesn't blow up
a.__repr__()
def test_srv_value_rdata_text(self):
# empty string won't parse
with self.assertRaises(RrParseError):
SrvValue.parse_rdata_text('')
# single word won't parse
with self.assertRaises(RrParseError):
SrvValue.parse_rdata_text('nope')
# 2nd word won't parse
with self.assertRaises(RrParseError):
SrvValue.parse_rdata_text('1 2')
# 3rd word won't parse
with self.assertRaises(RrParseError):
SrvValue.parse_rdata_text('1 2 3')
# 5th word won't parse
with self.assertRaises(RrParseError):
SrvValue.parse_rdata_text('1 2 3 4 5')
# priority weight and port not ints
self.assertEqual(
{
'priority': 'one',
'weight': 'two',
'port': 'three',
'target': 'srv.unit.tests.',
},
SrvValue.parse_rdata_text('one two three srv.unit.tests.'),
)
# valid
self.assertEqual(
{
'priority': 1,
'weight': 2,
'port': 3,
'target': 'srv.unit.tests.',
},
SrvValue.parse_rdata_text('1 2 3 srv.unit.tests.'),
)
zone = Zone('unit.tests.', [])
a = SrvRecord(
zone,
'_srv._tcp',
{
'ttl': 32,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
'target': 'srv.unit.tests.',
},
},
)
self.assertEqual(1, a.values[0].priority)
self.assertEqual(2, a.values[0].weight)
self.assertEqual(3, a.values[0].port)
self.assertEqual('srv.unit.tests.', a.values[0].target)
def test_srv_value(self):
a = SrvValue({'priority': 0, 'weight': 0, 'port': 0, 'target': 'foo.'})
b = SrvValue({'priority': 1, 'weight': 0, 'port': 0, 'target': 'foo.'})
c = SrvValue({'priority': 0, 'weight': 2, 'port': 0, 'target': 'foo.'})
d = SrvValue({'priority': 0, 'weight': 0, 'port': 3, 'target': 'foo.'})
e = SrvValue({'priority': 0, 'weight': 0, 'port': 0, 'target': 'mmm.'})
self.assertEqual(a, a)
self.assertEqual(b, b)
self.assertEqual(c, c)
self.assertEqual(d, d)
self.assertEqual(e, e)
self.assertNotEqual(a, b)
self.assertNotEqual(a, c)
self.assertNotEqual(a, d)
self.assertNotEqual(a, e)
self.assertNotEqual(b, a)
self.assertNotEqual(b, c)
self.assertNotEqual(b, d)
self.assertNotEqual(b, e)
self.assertNotEqual(c, a)
self.assertNotEqual(c, b)
self.assertNotEqual(c, d)
self.assertNotEqual(c, e)
self.assertNotEqual(d, a)
self.assertNotEqual(d, b)
self.assertNotEqual(d, c)
self.assertNotEqual(d, e)
self.assertNotEqual(e, a)
self.assertNotEqual(e, b)
self.assertNotEqual(e, c)
self.assertNotEqual(e, d)
self.assertTrue(a < b)
self.assertTrue(a < c)
self.assertTrue(b > a)
self.assertTrue(b > c)
self.assertTrue(c > a)
self.assertTrue(c < b)
self.assertTrue(a <= b)
self.assertTrue(a <= c)
self.assertTrue(a <= a)
self.assertTrue(a >= a)
self.assertTrue(b >= a)
self.assertTrue(b >= c)
self.assertTrue(b >= b)
self.assertTrue(b <= b)
self.assertTrue(c >= a)
self.assertTrue(c <= b)
self.assertTrue(c >= c)
self.assertTrue(c <= c)
# Hash
values = set()
values.add(a)
self.assertTrue(a in values)
self.assertFalse(b in values)
values.add(b)
self.assertTrue(b in values)
def test_valiation(self):
# doesn't blow up
Record.new(
self.zone,
'_srv._tcp',
{
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
'target': 'foo.bar.baz.',
},
},
)
# permit wildcard entries
Record.new(
self.zone,
'*._tcp',
{
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
'target': 'food.bar.baz.',
},
},
)
# invalid name
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'neup',
{
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
'target': 'foo.bar.baz.',
},
},
)
self.assertEqual(['invalid name for SRV record'], ctx.exception.reasons)
# missing priority
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'_srv._tcp',
{
'type': 'SRV',
'ttl': 600,
'value': {'weight': 2, 'port': 3, 'target': 'foo.bar.baz.'},
},
)
self.assertEqual(['missing priority'], ctx.exception.reasons)
# invalid priority
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'_srv._tcp',
{
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 'foo',
'weight': 2,
'port': 3,
'target': 'foo.bar.baz.',
},
},
)
self.assertEqual(['invalid priority "foo"'], ctx.exception.reasons)
# missing weight
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'_srv._tcp',
{
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'port': 3,
'target': 'foo.bar.baz.',
},
},
)
self.assertEqual(['missing weight'], ctx.exception.reasons)
# invalid weight
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'_srv._tcp',
{
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 'foo',
'port': 3,
'target': 'foo.bar.baz.',
},
},
)
self.assertEqual(['invalid weight "foo"'], ctx.exception.reasons)
# missing port
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'_srv._tcp',
{
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'target': 'foo.bar.baz.',
},
},
)
self.assertEqual(['missing port'], ctx.exception.reasons)
# invalid port
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'_srv._tcp',
{
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 'foo',
'target': 'foo.bar.baz.',
},
},
)
self.assertEqual(['invalid port "foo"'], ctx.exception.reasons)
# missing target
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'_srv._tcp',
{
'type': 'SRV',
'ttl': 600,
'value': {'priority': 1, 'weight': 2, 'port': 3},
},
)
self.assertEqual(['missing target'], ctx.exception.reasons)
# invalid target
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'_srv._tcp',
{
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
'target': 'foo.bar.baz',
},
},
)
self.assertEqual(
['SRV value "foo.bar.baz" missing trailing .'],
ctx.exception.reasons,
)
# falsey target
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'_srv._tcp',
{
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
'target': '',
},
},
)
self.assertEqual(['missing target'], ctx.exception.reasons)
# target must be a valid FQDN
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'_srv._tcp',
{
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
'target': '100 foo.bar.com.',
},
},
)
self.assertEqual(
['Invalid SRV target "100 foo.bar.com." is not a valid FQDN.'],
ctx.exception.reasons,
)

+ 330
- 0
tests/test_octodns_record_sshfp.py View File

@ -0,0 +1,330 @@
#
#
#
from unittest import TestCase
from octodns.record.base import Record
from octodns.record.exception import ValidationError
from octodns.record.sshfp import SshfpRecord, SshfpValue
from octodns.record.rr import RrParseError
from octodns.zone import Zone
from helpers import SimpleProvider
class TestRecordSshfp(TestCase):
zone = Zone('unit.tests.', [])
def test_sshfp(self):
a_values = [
SshfpValue(
{
'algorithm': 10,
'fingerprint_type': 11,
'fingerprint': 'abc123',
}
),
SshfpValue(
{
'algorithm': 20,
'fingerprint_type': 21,
'fingerprint': 'def456',
}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = SshfpRecord(self.zone, 'a', a_data)
self.assertEqual('a', a.name)
self.assertEqual('a.unit.tests.', a.fqdn)
self.assertEqual(30, a.ttl)
self.assertEqual(a_values[0]['algorithm'], a.values[0].algorithm)
self.assertEqual(
a_values[0]['fingerprint_type'], a.values[0].fingerprint_type
)
self.assertEqual(a_values[0]['fingerprint'], a.values[0].fingerprint)
self.assertEqual(a_data, a.data)
b_value = SshfpValue(
{'algorithm': 30, 'fingerprint_type': 31, 'fingerprint': 'ghi789'}
)
b_data = {'ttl': 30, 'value': b_value}
b = SshfpRecord(self.zone, 'b', b_data)
self.assertEqual(b_value['algorithm'], b.values[0].algorithm)
self.assertEqual(
b_value['fingerprint_type'], b.values[0].fingerprint_type
)
self.assertEqual(b_value['fingerprint'], b.values[0].fingerprint)
self.assertEqual(b_data, b.data)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
# Diff in algorithm causes change
other = SshfpRecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
other.values[0].algorithm = 22
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in fingerprint_type causes change
other = SshfpRecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
other.values[0].algorithm = a.values[0].algorithm
other.values[0].fingerprint_type = 22
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in fingerprint causes change
other = SshfpRecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
other.values[0].fingerprint_type = a.values[0].fingerprint_type
other.values[0].fingerprint = 22
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# __repr__ doesn't blow up
a.__repr__()
def test_sshfp_value_rdata_text(self):
# empty string won't parse
with self.assertRaises(RrParseError):
SshfpValue.parse_rdata_text('')
# single word won't parse
with self.assertRaises(RrParseError):
SshfpValue.parse_rdata_text('nope')
# 3rd word won't parse
with self.assertRaises(RrParseError):
SshfpValue.parse_rdata_text('0 1 00479b27 another')
# algorithm and fingerprint_type not ints
self.assertEqual(
{
'algorithm': 'one',
'fingerprint_type': 'two',
'fingerprint': '00479b27',
},
SshfpValue.parse_rdata_text('one two 00479b27'),
)
# valid
self.assertEqual(
{'algorithm': 1, 'fingerprint_type': 2, 'fingerprint': '00479b27'},
SshfpValue.parse_rdata_text('1 2 00479b27'),
)
zone = Zone('unit.tests.', [])
a = SshfpRecord(
zone,
'sshfp',
{
'ttl': 32,
'value': {
'algorithm': 1,
'fingerprint_type': 2,
'fingerprint': '00479b27',
},
},
)
self.assertEqual(1, a.values[0].algorithm)
self.assertEqual(2, a.values[0].fingerprint_type)
self.assertEqual('00479b27', a.values[0].fingerprint)
self.assertEqual('1 2 00479b27', a.values[0].rdata_text)
def test_sshfp_value(self):
a = SshfpValue(
{'algorithm': 0, 'fingerprint_type': 0, 'fingerprint': 'abcd'}
)
b = SshfpValue(
{'algorithm': 1, 'fingerprint_type': 0, 'fingerprint': 'abcd'}
)
c = SshfpValue(
{'algorithm': 0, 'fingerprint_type': 1, 'fingerprint': 'abcd'}
)
d = SshfpValue(
{'algorithm': 0, 'fingerprint_type': 0, 'fingerprint': 'bcde'}
)
self.assertEqual(a, a)
self.assertEqual(b, b)
self.assertEqual(c, c)
self.assertEqual(d, d)
self.assertNotEqual(a, b)
self.assertNotEqual(a, c)
self.assertNotEqual(a, d)
self.assertNotEqual(b, a)
self.assertNotEqual(b, c)
self.assertNotEqual(b, d)
self.assertNotEqual(c, a)
self.assertNotEqual(c, b)
self.assertNotEqual(c, d)
self.assertNotEqual(d, a)
self.assertNotEqual(d, b)
self.assertNotEqual(d, c)
self.assertTrue(a < b)
self.assertTrue(a < c)
self.assertTrue(b > a)
self.assertTrue(b > c)
self.assertTrue(c > a)
self.assertTrue(c < b)
self.assertTrue(a <= b)
self.assertTrue(a <= c)
self.assertTrue(a <= a)
self.assertTrue(a >= a)
self.assertTrue(b >= a)
self.assertTrue(b >= c)
self.assertTrue(b >= b)
self.assertTrue(b <= b)
self.assertTrue(c >= a)
self.assertTrue(c <= b)
self.assertTrue(c >= c)
self.assertTrue(c <= c)
# Hash
values = set()
values.add(a)
self.assertTrue(a in values)
self.assertFalse(b in values)
values.add(b)
self.assertTrue(b in values)
def test_validation(self):
# doesn't blow up
Record.new(
self.zone,
'',
{
'type': 'SSHFP',
'ttl': 600,
'value': {
'algorithm': 1,
'fingerprint_type': 1,
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73',
},
},
)
# missing algorithm
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'SSHFP',
'ttl': 600,
'value': {
'fingerprint_type': 1,
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73',
},
},
)
self.assertEqual(['missing algorithm'], ctx.exception.reasons)
# invalid algorithm
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'SSHFP',
'ttl': 600,
'value': {
'algorithm': 'nope',
'fingerprint_type': 2,
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73',
},
},
)
self.assertEqual(['invalid algorithm "nope"'], ctx.exception.reasons)
# unrecognized algorithm
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'SSHFP',
'ttl': 600,
'value': {
'algorithm': 42,
'fingerprint_type': 1,
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73',
},
},
)
self.assertEqual(['unrecognized algorithm "42"'], ctx.exception.reasons)
# missing fingerprint_type
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'SSHFP',
'ttl': 600,
'value': {
'algorithm': 2,
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73',
},
},
)
self.assertEqual(['missing fingerprint_type'], ctx.exception.reasons)
# invalid fingerprint_type
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'SSHFP',
'ttl': 600,
'value': {
'algorithm': 3,
'fingerprint_type': 'yeeah',
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73',
},
},
)
self.assertEqual(
['invalid fingerprint_type "yeeah"'], ctx.exception.reasons
)
# unrecognized fingerprint_type
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'SSHFP',
'ttl': 600,
'value': {
'algorithm': 1,
'fingerprint_type': 42,
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73',
},
},
)
self.assertEqual(
['unrecognized fingerprint_type "42"'], ctx.exception.reasons
)
# missing fingerprint
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'SSHFP',
'ttl': 600,
'value': {'algorithm': 1, 'fingerprint_type': 1},
},
)
self.assertEqual(['missing fingerprint'], ctx.exception.reasons)

+ 31
- 0
tests/test_octodns_record_target.py View File

@ -0,0 +1,31 @@
#
#
#
from unittest import TestCase
from octodns.record.alias import AliasRecord
from octodns.record.target import _TargetValue
from octodns.zone import Zone
class TestRecordTarget(TestCase):
def test_target_rdata_text(self):
# anything goes, we're a noop
for s in (
None,
'',
'word',
42,
42.43,
'1.2.3',
'some.words.that.here',
'1.2.word.4',
'1.2.3.4',
):
self.assertEqual(s, _TargetValue.parse_rdata_text(s))
zone = Zone('unit.tests.', [])
a = AliasRecord(zone, 'a', {'ttl': 42, 'value': 'some.target.'})
self.assertEqual('some.target.', a.value.rdata_text)

+ 421
- 0
tests/test_octodns_record_tlsa.py View File

@ -0,0 +1,421 @@
#
#
#
from unittest import TestCase
from octodns.record import Record
from octodns.record.tlsa import TlsaRecord, TlsaValue
from octodns.record.exception import ValidationError
from octodns.record.rr import RrParseError
from octodns.zone import Zone
from helpers import SimpleProvider
class TestRecordTlsa(TestCase):
zone = Zone('unit.tests.', [])
def test_tlsa(self):
a_values = [
TlsaValue(
{
'certificate_usage': 1,
'selector': 1,
'matching_type': 1,
'certificate_association_data': 'ABABABABABABABABAB',
}
),
TlsaValue(
{
'certificate_usage': 2,
'selector': 0,
'matching_type': 2,
'certificate_association_data': 'ABABABABABABABABAC',
}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = TlsaRecord(self.zone, 'a', a_data)
self.assertEqual('a.unit.tests.', a.fqdn)
self.assertEqual('a', a.name)
self.assertEqual(30, a.ttl)
self.assertEqual(
a_values[0]['certificate_usage'], a.values[0].certificate_usage
)
self.assertEqual(a_values[0]['selector'], a.values[0].selector)
self.assertEqual(
a_values[0]['matching_type'], a.values[0].matching_type
)
self.assertEqual(
a_values[0]['certificate_association_data'],
a.values[0].certificate_association_data,
)
self.assertEqual(
a_values[1]['certificate_usage'], a.values[1].certificate_usage
)
self.assertEqual(a_values[1]['selector'], a.values[1].selector)
self.assertEqual(
a_values[1]['matching_type'], a.values[1].matching_type
)
self.assertEqual(
a_values[1]['certificate_association_data'],
a.values[1].certificate_association_data,
)
self.assertEqual(a_data, a.data)
b_value = TlsaValue(
{
'certificate_usage': 0,
'selector': 0,
'matching_type': 0,
'certificate_association_data': 'AAAAAAAAAAAAAAA',
}
)
b_data = {'ttl': 30, 'value': b_value}
b = TlsaRecord(self.zone, 'b', b_data)
self.assertEqual(
b_value['certificate_usage'], b.values[0].certificate_usage
)
self.assertEqual(b_value['selector'], b.values[0].selector)
self.assertEqual(b_value['matching_type'], b.values[0].matching_type)
self.assertEqual(
b_value['certificate_association_data'],
b.values[0].certificate_association_data,
)
self.assertEqual(b_data, b.data)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
# Diff in certificate_usage causes change
other = TlsaRecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
other.values[0].certificate_usage = 0
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in selector causes change
other = TlsaRecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
other.values[0].selector = 0
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in matching_type causes change
other = TlsaRecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
other.values[0].matching_type = 0
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in certificate_association_data causes change
other = TlsaRecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
other.values[0].certificate_association_data = 'AAAAAAAAAAAAA'
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# __repr__ doesn't blow up
a.__repr__()
def test_tsla_value_rdata_text(self):
# empty string won't parse
with self.assertRaises(RrParseError):
TlsaValue.parse_rdata_text('')
# single word won't parse
with self.assertRaises(RrParseError):
TlsaValue.parse_rdata_text('nope')
# 2nd word won't parse
with self.assertRaises(RrParseError):
TlsaValue.parse_rdata_text('1 2')
# 3rd word won't parse
with self.assertRaises(RrParseError):
TlsaValue.parse_rdata_text('1 2 3')
# 5th word won't parse
with self.assertRaises(RrParseError):
TlsaValue.parse_rdata_text('1 2 3 abcd another')
# non-ints
self.assertEqual(
{
'certificate_usage': 'one',
'selector': 'two',
'matching_type': 'three',
'certificate_association_data': 'abcd',
},
TlsaValue.parse_rdata_text('one two three abcd'),
)
# valid
self.assertEqual(
{
'certificate_usage': 1,
'selector': 2,
'matching_type': 3,
'certificate_association_data': 'abcd',
},
TlsaValue.parse_rdata_text('1 2 3 abcd'),
)
zone = Zone('unit.tests.', [])
a = TlsaRecord(
zone,
'tlsa',
{
'ttl': 32,
'value': {
'certificate_usage': 2,
'selector': 1,
'matching_type': 0,
'certificate_association_data': 'abcd',
},
},
)
self.assertEqual(2, a.values[0].certificate_usage)
self.assertEqual(1, a.values[0].selector)
self.assertEqual(0, a.values[0].matching_type)
self.assertEqual('abcd', a.values[0].certificate_association_data)
self.assertEqual('2 1 0 abcd', a.values[0].rdata_text)
def test_validation(self):
# doesn't blow up
Record.new(
self.zone,
'',
{
'type': 'TLSA',
'ttl': 600,
'value': {
'certificate_usage': 0,
'selector': 0,
'matching_type': 0,
'certificate_association_data': 'AAAAAAAAAAAAA',
},
},
)
# Multi value, second missing certificate usage
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'TLSA',
'ttl': 600,
'values': [
{
'certificate_usage': 0,
'selector': 0,
'matching_type': 0,
'certificate_association_data': 'AAAAAAAAAAAAA',
},
{
'selector': 0,
'matching_type': 0,
'certificate_association_data': 'AAAAAAAAAAAAA',
},
],
},
)
self.assertEqual(
['missing certificate_usage'], ctx.exception.reasons
)
# missing certificate_association_data
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'TLSA',
'ttl': 600,
'value': {
'certificate_usage': 0,
'selector': 0,
'matching_type': 0,
},
},
)
self.assertEqual(
['missing certificate_association_data'], ctx.exception.reasons
)
# missing certificate_usage
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'TLSA',
'ttl': 600,
'value': {
'selector': 0,
'matching_type': 0,
'certificate_association_data': 'AAAAAAAAAAAAA',
},
},
)
self.assertEqual(
['missing certificate_usage'], ctx.exception.reasons
)
# False certificate_usage
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'TLSA',
'ttl': 600,
'value': {
'certificate_usage': 4,
'selector': 0,
'matching_type': 0,
'certificate_association_data': 'AAAAAAAAAAAAA',
},
},
)
self.assertEqual(
'invalid certificate_usage "{value["certificate_usage"]}"',
ctx.exception.reasons,
)
# Invalid certificate_usage
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'TLSA',
'ttl': 600,
'value': {
'certificate_usage': 'XYZ',
'selector': 0,
'matching_type': 0,
'certificate_association_data': 'AAAAAAAAAAAAA',
},
},
)
self.assertEqual(
'invalid certificate_usage "{value["certificate_usage"]}"',
ctx.exception.reasons,
)
# missing selector
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'TLSA',
'ttl': 600,
'value': {
'certificate_usage': 0,
'matching_type': 0,
'certificate_association_data': 'AAAAAAAAAAAAA',
},
},
)
self.assertEqual(['missing selector'], ctx.exception.reasons)
# False selector
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'TLSA',
'ttl': 600,
'value': {
'certificate_usage': 0,
'selector': 4,
'matching_type': 0,
'certificate_association_data': 'AAAAAAAAAAAAA',
},
},
)
self.assertEqual(
'invalid selector "{value["selector"]}"', ctx.exception.reasons
)
# Invalid selector
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'TLSA',
'ttl': 600,
'value': {
'certificate_usage': 0,
'selector': 'XYZ',
'matching_type': 0,
'certificate_association_data': 'AAAAAAAAAAAAA',
},
},
)
self.assertEqual(
'invalid selector "{value["selector"]}"', ctx.exception.reasons
)
# missing matching_type
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'TLSA',
'ttl': 600,
'value': {
'certificate_usage': 0,
'selector': 0,
'certificate_association_data': 'AAAAAAAAAAAAA',
},
},
)
self.assertEqual(['missing matching_type'], ctx.exception.reasons)
# False matching_type
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'TLSA',
'ttl': 600,
'value': {
'certificate_usage': 0,
'selector': 1,
'matching_type': 3,
'certificate_association_data': 'AAAAAAAAAAAAA',
},
},
)
self.assertEqual(
'invalid matching_type "{value["matching_type"]}"',
ctx.exception.reasons,
)
# Invalid matching_type
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'TLSA',
'ttl': 600,
'value': {
'certificate_usage': 0,
'selector': 1,
'matching_type': 'XYZ',
'certificate_association_data': 'AAAAAAAAAAAAA',
},
},
)
self.assertEqual(
'invalid matching_type "{value["matching_type"]}"',
ctx.exception.reasons,
)

+ 144
- 0
tests/test_octodns_record_txt.py View File

@ -0,0 +1,144 @@
#
#
#
from unittest import TestCase
from octodns.record import Record
from octodns.record.txt import TxtRecord
from octodns.record.exception import ValidationError
from octodns.zone import Zone
class TestRecordTxt(TestCase):
zone = Zone('unit.tests.', [])
def assertMultipleValues(self, _type, a_values, b_value):
a_data = {'ttl': 30, 'values': a_values}
a = _type(self.zone, 'a', a_data)
self.assertEqual('a', a.name)
self.assertEqual('a.unit.tests.', a.fqdn)
self.assertEqual(30, a.ttl)
self.assertEqual(a_values, a.values)
self.assertEqual(a_data, a.data)
b_data = {'ttl': 30, 'value': b_value}
b = _type(self.zone, 'b', b_data)
self.assertEqual([b_value], b.values)
self.assertEqual(b_data, b.data)
def test_txt(self):
a_values = ['a one', 'a two']
b_value = 'b other'
self.assertMultipleValues(TxtRecord, a_values, b_value)
def test_validation(self):
# doesn't blow up (name & zone here don't make any sense, but not
# important)
Record.new(
self.zone,
'',
{
'type': 'TXT',
'ttl': 600,
'values': [
'hello world',
'this has some\\; semi-colons\\; in it',
],
},
)
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {'type': 'TXT', 'ttl': 600})
self.assertEqual(['missing value(s)'], ctx.exception.reasons)
# missing escapes
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'TXT',
'ttl': 600,
'value': 'this has some; semi-colons\\; in it',
},
)
self.assertEqual(
['unescaped ; in "this has some; semi-colons\\; in it"'],
ctx.exception.reasons,
)
def test_long_value_chunking(self):
expected = (
'"Lorem ipsum dolor sit amet, consectetur adipiscing '
'elit, sed do eiusmod tempor incididunt ut labore et dolore '
'magna aliqua. Ut enim ad minim veniam, quis nostrud '
'exercitation ullamco laboris nisi ut aliquip ex ea commodo '
'consequat. Duis aute irure dolor i" "n reprehenderit in '
'voluptate velit esse cillum dolore eu fugiat nulla pariatur. '
'Excepteur sint occaecat cupidatat non proident, sunt in culpa '
'qui officia deserunt mollit anim id est laborum."'
)
long_value = (
'Lorem ipsum dolor sit amet, consectetur adipiscing '
'elit, sed do eiusmod tempor incididunt ut labore et dolore '
'magna aliqua. Ut enim ad minim veniam, quis nostrud '
'exercitation ullamco laboris nisi ut aliquip ex ea commodo '
'consequat. Duis aute irure dolor in reprehenderit in '
'voluptate velit esse cillum dolore eu fugiat nulla '
'pariatur. Excepteur sint occaecat cupidatat non proident, '
'sunt in culpa qui officia deserunt mollit anim id est '
'laborum.'
)
# Single string
single = Record.new(
self.zone,
'',
{
'type': 'TXT',
'ttl': 600,
'values': [
'hello world',
long_value,
'this has some\\; semi-colons\\; in it',
],
},
)
self.assertEqual(3, len(single.values))
self.assertEqual(3, len(single.chunked_values))
# Note we are checking that this normalizes the chunking, not that we
# get out what we put in.
self.assertEqual(expected, single.chunked_values[0])
long_split_value = (
'"Lorem ipsum dolor sit amet, consectetur '
'adipiscing elit, sed do eiusmod tempor incididunt ut '
'labore et dolore magna aliqua. Ut enim ad minim veniam, '
'quis nostrud exercitation ullamco laboris nisi ut aliquip '
'ex" " ea commodo consequat. Duis aute irure dolor in '
'reprehenderit in voluptate velit esse cillum dolore eu '
'fugiat nulla pariatur. Excepteur sint occaecat cupidatat '
'non proident, sunt in culpa qui officia deserunt mollit '
'anim id est laborum."'
)
# Chunked
chunked = Record.new(
self.zone,
'',
{
'type': 'TXT',
'ttl': 600,
'values': [
'"hello world"',
long_split_value,
'"this has some\\; semi-colons\\; in it"',
],
},
)
self.assertEqual(expected, chunked.chunked_values[0])
# should be single values, no quoting
self.assertEqual(single.values, chunked.values)
# should be chunked values, with quoting
self.assertEqual(single.chunked_values, chunked.chunked_values)

+ 391
- 0
tests/test_octodns_record_urlfwd.py View File

@ -0,0 +1,391 @@
#
#
#
from unittest import TestCase
from octodns.record import Record
from octodns.record.urlfwd import UrlfwdRecord, UrlfwdValue
from octodns.record.exception import ValidationError
from octodns.zone import Zone
from helpers import SimpleProvider
class TestRecordUrlfwd(TestCase):
zone = Zone('unit.tests.', [])
def test_urlfwd(self):
a_values = [
UrlfwdValue(
{
'path': '/',
'target': 'http://foo',
'code': 301,
'masking': 2,
'query': 0,
}
),
UrlfwdValue(
{
'path': '/target',
'target': 'http://target',
'code': 302,
'masking': 2,
'query': 0,
}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = UrlfwdRecord(self.zone, 'a', a_data)
self.assertEqual('a', a.name)
self.assertEqual('a.unit.tests.', a.fqdn)
self.assertEqual(30, a.ttl)
self.assertEqual(a_values[0]['path'], a.values[0].path)
self.assertEqual(a_values[0]['target'], a.values[0].target)
self.assertEqual(a_values[0]['code'], a.values[0].code)
self.assertEqual(a_values[0]['masking'], a.values[0].masking)
self.assertEqual(a_values[0]['query'], a.values[0].query)
self.assertEqual(a_values[1]['path'], a.values[1].path)
self.assertEqual(a_values[1]['target'], a.values[1].target)
self.assertEqual(a_values[1]['code'], a.values[1].code)
self.assertEqual(a_values[1]['masking'], a.values[1].masking)
self.assertEqual(a_values[1]['query'], a.values[1].query)
self.assertEqual(a_data, a.data)
b_value = UrlfwdValue(
{
'path': '/',
'target': 'http://location',
'code': 301,
'masking': 2,
'query': 0,
}
)
b_data = {'ttl': 30, 'value': b_value}
b = UrlfwdRecord(self.zone, 'b', b_data)
self.assertEqual(b_value['path'], b.values[0].path)
self.assertEqual(b_value['target'], b.values[0].target)
self.assertEqual(b_value['code'], b.values[0].code)
self.assertEqual(b_value['masking'], b.values[0].masking)
self.assertEqual(b_value['query'], b.values[0].query)
self.assertEqual(b_data, b.data)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
# Diff in path causes change
other = UrlfwdRecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
other.values[0].path = '/change'
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in target causes change
other = UrlfwdRecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
other.values[0].target = 'http://target'
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in code causes change
other = UrlfwdRecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
other.values[0].code = 302
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in masking causes change
other = UrlfwdRecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
other.values[0].masking = 0
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in query causes change
other = UrlfwdRecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
other.values[0].query = 1
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# hash
v = UrlfwdValue(
{
'path': '/',
'target': 'http://place',
'code': 301,
'masking': 2,
'query': 0,
}
)
o = UrlfwdValue(
{
'path': '/location',
'target': 'http://redirect',
'code': 302,
'masking': 2,
'query': 0,
}
)
values = set()
values.add(v)
self.assertTrue(v in values)
self.assertFalse(o in values)
values.add(o)
self.assertTrue(o in values)
# __repr__ doesn't blow up
a.__repr__()
def test_validation(self):
# doesn't blow up
Record.new(
self.zone,
'',
{
'type': 'URLFWD',
'ttl': 600,
'value': {
'path': '/',
'target': 'http://foo',
'code': 301,
'masking': 2,
'query': 0,
},
},
)
Record.new(
self.zone,
'',
{
'type': 'URLFWD',
'ttl': 600,
'values': [
{
'path': '/',
'target': 'http://foo',
'code': 301,
'masking': 2,
'query': 0,
},
{
'path': '/target',
'target': 'http://target',
'code': 302,
'masking': 2,
'query': 0,
},
],
},
)
# missing path
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'URLFWD',
'ttl': 600,
'value': {
'target': 'http://foo',
'code': 301,
'masking': 2,
'query': 0,
},
},
)
self.assertEqual(['missing path'], ctx.exception.reasons)
# missing target
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'URLFWD',
'ttl': 600,
'value': {
'path': '/',
'code': 301,
'masking': 2,
'query': 0,
},
},
)
self.assertEqual(['missing target'], ctx.exception.reasons)
# missing code
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'URLFWD',
'ttl': 600,
'value': {
'path': '/',
'target': 'http://foo',
'masking': 2,
'query': 0,
},
},
)
self.assertEqual(['missing code'], ctx.exception.reasons)
# invalid code
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'URLFWD',
'ttl': 600,
'value': {
'path': '/',
'target': 'http://foo',
'code': 'nope',
'masking': 2,
'query': 0,
},
},
)
self.assertEqual(['invalid return code "nope"'], ctx.exception.reasons)
# unrecognized code
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'URLFWD',
'ttl': 600,
'value': {
'path': '/',
'target': 'http://foo',
'code': 3,
'masking': 2,
'query': 0,
},
},
)
self.assertEqual(
['unrecognized return code "3"'], ctx.exception.reasons
)
# missing masking
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'URLFWD',
'ttl': 600,
'value': {
'path': '/',
'target': 'http://foo',
'code': 301,
'query': 0,
},
},
)
self.assertEqual(['missing masking'], ctx.exception.reasons)
# invalid masking
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'URLFWD',
'ttl': 600,
'value': {
'path': '/',
'target': 'http://foo',
'code': 301,
'masking': 'nope',
'query': 0,
},
},
)
self.assertEqual(
['invalid masking setting "nope"'], ctx.exception.reasons
)
# unrecognized masking
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'URLFWD',
'ttl': 600,
'value': {
'path': '/',
'target': 'http://foo',
'code': 301,
'masking': 3,
'query': 0,
},
},
)
self.assertEqual(
['unrecognized masking setting "3"'], ctx.exception.reasons
)
# missing query
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'URLFWD',
'ttl': 600,
'value': {
'path': '/',
'target': 'http://foo',
'code': 301,
'masking': 2,
},
},
)
self.assertEqual(['missing query'], ctx.exception.reasons)
# invalid query
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'URLFWD',
'ttl': 600,
'value': {
'path': '/',
'target': 'http://foo',
'code': 301,
'masking': 2,
'query': 'nope',
},
},
)
self.assertEqual(
['invalid query setting "nope"'], ctx.exception.reasons
)
# unrecognized query
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'URLFWD',
'ttl': 600,
'value': {
'path': '/',
'target': 'http://foo',
'code': 301,
'masking': 2,
'query': 3,
},
},
)
self.assertEqual(
['unrecognized query setting "3"'], ctx.exception.reasons
)

Loading…
Cancel
Save