You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

202 lines
6.6 KiB

#
#
#
from unittest import TestCase
from unittest.mock import patch
from octodns import __VERSION__
from octodns.processor.meta import MetaProcessor
from octodns.provider.plan import Plan
from octodns.record import Create, Record, Update
from octodns.zone import Zone
class TestMetaProcessor(TestCase):
zone = Zone('unit.tests.', [])
meta_needs_update = Record.new(
zone,
'meta',
{
'type': 'TXT',
'ttl': 60,
# will always need updating
'values': ['uuid'],
},
)
meta_up_to_date = Record.new(
zone,
'meta',
{
'type': 'TXT',
'ttl': 60,
# only has time, value should be ignored
'values': ['time=xxx'],
},
)
not_meta = Record.new(
zone,
'its-not-meta',
{
'type': 'TXT',
'ttl': 60,
# has time, but name is wrong so won't matter
'values': ['time=xyz'],
},
)
@patch('octodns.processor.meta.MetaProcessor.now')
@patch('octodns.processor.meta.MetaProcessor.uuid')
def test_args_and_values(self, uuid_mock, now_mock):
# defaults, just time
uuid_mock.side_effect = [Exception('not used')]
now_mock.side_effect = ['the-time']
proc = MetaProcessor('test')
self.assertEqual(['time=the-time'], proc.values)
# just uuid
uuid_mock.side_effect = ['abcdef-1234567890']
now_mock.side_effect = [Exception('not used')]
proc = MetaProcessor('test', include_time=False, include_uuid=True)
self.assertEqual(['uuid=abcdef-1234567890'], proc.values)
# just version
uuid_mock.side_effect = [Exception('not used')]
now_mock.side_effect = [Exception('not used')]
proc = MetaProcessor('test', include_time=False, include_version=True)
self.assertEqual([f'octodns-version={__VERSION__}'], proc.values)
# just provider
proc = MetaProcessor('test', include_time=False, include_provider=True)
self.assertTrue(proc.include_provider)
self.assertFalse(proc.values)
# everything
uuid_mock.side_effect = ['abcdef-1234567890']
now_mock.side_effect = ['the-time']
proc = MetaProcessor(
'test',
include_time=True,
include_uuid=True,
include_version=True,
include_provider=True,
)
self.assertEqual(
[
f'octodns-version={__VERSION__}',
'time=the-time',
'uuid=abcdef-1234567890',
],
proc.values,
)
self.assertTrue(proc.include_provider)
def test_uuid(self):
proc = MetaProcessor('test', include_time=False, include_uuid=True)
self.assertEqual(1, len(proc.values))
self.assertTrue(proc.values[0].startswith('uuid'))
# uuid's have 4 -
self.assertEqual(4, proc.values[0].count('-'))
def test_up_to_date(self):
proc = MetaProcessor('test')
# Creates always need to happen
self.assertFalse(proc._up_to_date(Create(self.meta_needs_update)))
self.assertFalse(proc._up_to_date(Create(self.meta_up_to_date)))
# Updates depend on the contents
self.assertFalse(proc._up_to_date(Update(self.meta_needs_update, None)))
self.assertTrue(proc._up_to_date(Update(self.meta_up_to_date, None)))
@patch('octodns.processor.meta.MetaProcessor.now')
def test_process_source_zone(self, now_mock):
now_mock.side_effect = ['the-time']
proc = MetaProcessor('test')
# meta record was added
desired = self.zone.copy()
processed = proc.process_source_zone(desired, None)
record = next(iter(processed.records))
self.assertEqual(self.meta_up_to_date, record)
self.assertEqual(['time=the-time'], record.values)
def test_process_target_zone(self):
proc = MetaProcessor('test')
# with defaults, not enabled
zone = self.zone.copy()
processed = proc.process_target_zone(zone, None)
self.assertFalse(processed.records)
# enable provider
proc = MetaProcessor('test', include_provider=True)
class DummyTarget:
id = 'dummy'
# enabled provider, no meta record, shouldn't happen, but also shouldn't
# blow up
processed = proc.process_target_zone(zone, DummyTarget())
self.assertFalse(processed.records)
# enabled provider, should now look for and update the provider value,
# - only record so nothing to skip over
# - time value in there to be skipped over
proc = MetaProcessor('test', include_provider=True)
zone = self.zone.copy()
meta = self.meta_up_to_date.copy()
zone.add_record(meta)
processed = proc.process_target_zone(zone, DummyTarget())
record = next(iter(processed.records))
self.assertEqual(['provider=dummy', 'time=xxx'], record.values)
# add another unrelated record that needs to be skipped
proc = MetaProcessor('test', include_provider=True)
zone = self.zone.copy()
meta = self.meta_up_to_date.copy()
zone.add_record(meta)
zone.add_record(self.not_meta)
processed = proc.process_target_zone(zone, DummyTarget())
self.assertEqual(2, len(processed.records))
record = [r for r in processed.records if r.name == proc.record_name][0]
self.assertEqual(['provider=dummy', 'time=xxx'], record.values)
def test_process_plan(self):
proc = MetaProcessor('test')
# no plan, shouldn't happen, but we shouldn't blow up
self.assertFalse(proc.process_plan(None, None, None))
# plan with just an up to date meta record, should kill off the plan
plan = Plan(
None,
None,
[Update(self.meta_up_to_date, self.meta_needs_update)],
True,
)
self.assertFalse(proc.process_plan(plan, None, None))
# plan with an out of date meta record, should leave the plan alone
plan = Plan(
None,
None,
[Update(self.meta_needs_update, self.meta_up_to_date)],
True,
)
self.assertEqual(plan, proc.process_plan(plan, None, None))
# plan with other changes preserved even if meta was somehow up to date
plan = Plan(
None,
None,
[
Update(self.meta_up_to_date, self.meta_needs_update),
Create(self.not_meta),
],
True,
)
self.assertEqual(plan, proc.process_plan(plan, None, None))