From 4cae1e2bdb71b9e4d4298c727a51932ec3ef8b67 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Sat, 26 Aug 2017 08:18:17 -0700 Subject: [PATCH] Add CAA Record class and tests --- octodns/record.py | 75 +++++++++++++++---- tests/test_octodns_record.py | 138 ++++++++++++++++++++++++++++++++++- 2 files changed, 195 insertions(+), 18 deletions(-) diff --git a/octodns/record.py b/octodns/record.py index 6ee9dff..cc9949f 100644 --- a/octodns/record.py +++ b/octodns/record.py @@ -81,29 +81,16 @@ class Record(object): 'A': ARecord, 'AAAA': AaaaRecord, 'ALIAS': AliasRecord, - # cert + 'CAA': CaaRecord, 'CNAME': CnameRecord, - # dhcid - # dname - # dnskey - # ds - # ipseckey - # key - # kx - # loc 'MX': MxRecord, 'NAPTR': NaptrRecord, 'NS': NsRecord, - # nsap 'PTR': PtrRecord, - # px - # rp - # soa - would it even make sense? 'SPF': SpfRecord, 'SRV': SrvRecord, 'SSHFP': SshfpRecord, 'TXT': TxtRecord, - # url }[_type] except KeyError: raise Exception('Unknown record type: "{}"'.format(_type)) @@ -398,6 +385,66 @@ class AliasRecord(_ValueMixin, Record): return value +class CaaValue(object): + # https://tools.ietf.org/html/rfc6844#page-5 + + @classmethod + def _validate_value(cls, value): + reasons = [] + try: + flags = int(value.get('flags', 0)) + if flags not in (0, 1): + reasons.append('invalid flags "{}"'.format(flags)) + except ValueError: + reasons.append('invalid flags "{}"'.format(value['flags'])) + + try: + tag = value['tag'] + if tag not in ('issue', 'issuewild', 'iodef'): + reasons.append('invalid tag "{}"'.format(tag)) + except KeyError: + reasons.append('missing tag') + + if 'value' not in value: + reasons.append('missing value') + + return reasons + + def __init__(self, value): + self.flags = int(value.get('flags', 0)) + self.tag = value['tag'] + self.value = value['value'] + + @property + def data(self): + return { + 'flags': self.flags, + 'tag': self.tag, + 'value': self.value, + } + + def __cmp__(self, other): + if self.flags == other.flags: + if self.tag == other.tag: + return cmp(self.value, other.value) + return cmp(self.tag, other.tag) + return cmp(self.flags, other.flags) + + def __repr__(self): + return "'{} {} {}'".format(self.flags, self.tag, self.value) + + +class CaaRecord(_ValuesMixin, Record): + _type = 'CAA' + + @classmethod + def _validate_value(cls, value): + return CaaValue._validate_value(value) + + def _process_values(self, values): + return [CaaValue(v) for v in values] + + class CnameRecord(_ValueMixin, Record): _type = 'CNAME' diff --git a/tests/test_octodns_record.py b/tests/test_octodns_record.py index 1d64081..10e3869 100644 --- a/tests/test_octodns_record.py +++ b/tests/test_octodns_record.py @@ -7,10 +7,10 @@ from __future__ import absolute_import, division, print_function, \ from unittest import TestCase -from octodns.record import ARecord, AaaaRecord, AliasRecord, CnameRecord, \ - Create, Delete, GeoValue, MxRecord, NaptrRecord, NaptrValue, NsRecord, \ - Record, SshfpRecord, SpfRecord, SrvRecord, TxtRecord, Update, \ - ValidationError +from octodns.record import ARecord, AaaaRecord, AliasRecord, CaaRecord, \ + CnameRecord, Create, Delete, GeoValue, MxRecord, NaptrRecord, \ + NaptrValue, NsRecord, Record, SshfpRecord, SpfRecord, SrvRecord, \ + TxtRecord, Update, ValidationError from octodns.zone import Zone from helpers import GeoProvider, SimpleProvider @@ -206,6 +206,66 @@ class TestRecord(TestCase): # __repr__ doesn't blow up a.__repr__() + def test_caa(self): + a_values = [{ + 'flags': 0, + 'tag': 'issue', + 'value': 'ca.example.net', + }, { + 'flags': 1, + 'tag': 'iodef', + 'value': 'mailto:security@example.com', + }] + a_data = {'ttl': 30, 'values': a_values} + a = CaaRecord(self.zone, 'a', a_data) + self.assertEquals('a', a.name) + self.assertEquals('a.unit.tests.', a.fqdn) + self.assertEquals(30, a.ttl) + self.assertEquals(a_values[0]['flags'], a.values[0].flags) + self.assertEquals(a_values[0]['tag'], a.values[0].tag) + self.assertEquals(a_values[0]['value'], a.values[0].value) + self.assertEquals(a_values[1]['flags'], a.values[1].flags) + self.assertEquals(a_values[1]['tag'], a.values[1].tag) + self.assertEquals(a_values[1]['value'], a.values[1].value) + self.assertEquals(a_data, a.data) + + b_value = { + 'tag': 'iodef', + 'value': 'http://iodef.example.com/', + } + b_data = {'ttl': 30, 'value': b_value} + b = CaaRecord(self.zone, 'b', b_data) + self.assertEquals(0, b.values[0].flags) + self.assertEquals(b_value['tag'], b.values[0].tag) + self.assertEquals(b_value['value'], b.values[0].value) + b_data['value']['flags'] = 0 + self.assertEquals(b_data, b.data) + + target = SimpleProvider() + # No changes with self + self.assertFalse(a.changes(a, target)) + # Diff in flags causes change + other = CaaRecord(self.zone, 'a', {'ttl': 30, 'values': a_values}) + other.values[0].flags = 1 + change = a.changes(other, target) + self.assertEqual(change.existing, a) + self.assertEqual(change.new, other) + # Diff in tag causes change + other.values[0].flags = a.values[0].flags + other.values[0].tag = 'foo' + change = a.changes(other, target) + self.assertEqual(change.existing, a) + self.assertEqual(change.new, other) + # Diff in value causes change + other.values[0].tag = a.values[0].tag + other.values[0].value = 'bar' + change = a.changes(other, target) + self.assertEqual(change.existing, a) + self.assertEqual(change.new, other) + + # __repr__ doesn't blow up + a.__repr__() + def test_cname(self): self.assertSingleValue(CnameRecord, 'target.foo.com.', 'other.foo.com.') @@ -861,6 +921,76 @@ class TestRecordValidation(TestCase): }) self.assertEquals(['missing trailing .'], ctx.exception.reasons) + def test_CAA(self): + # doesn't blow up + Record.new(self.zone, '', { + 'type': 'CAA', + 'ttl': 600, + 'value': { + 'flags': 1, + 'tag': 'iodef', + 'value': 'http://foo.bar.com/' + } + }) + + # invalid flags + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'CAA', + 'ttl': 600, + 'value': { + 'flags': 42, + 'tag': 'iodef', + 'value': 'http://foo.bar.com/', + } + }) + self.assertEquals(['invalid flags "42"'], ctx.exception.reasons) + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'CAA', + 'ttl': 600, + 'value': { + 'flags': 'nope', + 'tag': 'iodef', + 'value': 'http://foo.bar.com/', + } + }) + self.assertEquals(['invalid flags "nope"'], ctx.exception.reasons) + + # missing tag + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'CAA', + 'ttl': 600, + 'value': { + 'value': 'http://foo.bar.com/', + } + }) + self.assertEquals(['missing tag'], ctx.exception.reasons) + + # invalid tag + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'CAA', + 'ttl': 600, + 'value': { + 'tag': 'xyz', + 'value': 'http://foo.bar.com/', + } + }) + self.assertEquals(['invalid tag "xyz"'], ctx.exception.reasons) + + # missing value + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'CAA', + 'ttl': 600, + 'value': { + 'tag': 'iodef', + } + }) + self.assertEquals(['missing value'], ctx.exception.reasons) + def test_CNAME(self): # doesn't blow up Record.new(self.zone, 'www', {