# # # from __future__ import absolute_import, division, print_function, \ unicode_literals from octodns.record import Create, Delete, Update, Record from octodns.provider.googlecloud import GoogleCloudProvider, \ _GoogleCloudRecordSetMaker from octodns.zone import Zone from octodns.provider.base import Plan, BaseProvider from unittest import TestCase from mock import Mock, patch, PropertyMock zone = Zone(name='unit.tests.', sub_zones=[]) octo_records = [] octo_records.append(Record.new(zone, '', { 'ttl': 0, 'type': 'A', 'values': ['1.2.3.4', '10.10.10.10']})) octo_records.append(Record.new(zone, 'a', { 'ttl': 1, 'type': 'A', 'values': ['1.2.3.4', '1.1.1.1']})) octo_records.append(Record.new(zone, 'aa', { 'ttl': 9001, 'type': 'A', 'values': ['1.2.4.3']})) octo_records.append(Record.new(zone, 'aaa', { 'ttl': 2, 'type': 'A', 'values': ['1.1.1.3']})) octo_records.append(Record.new(zone, 'cname', { 'ttl': 3, 'type': 'CNAME', 'value': 'a.unit.tests.'})) octo_records.append(Record.new(zone, 'mx1', { 'ttl': 3, 'type': 'MX', 'values': [{ 'priority': 10, 'value': 'mx1.unit.tests.', }, { 'priority': 20, 'value': 'mx2.unit.tests.', }]})) octo_records.append(Record.new(zone, 'mx2', { 'ttl': 3, 'type': 'MX', 'values': [{ 'priority': 10, 'value': 'mx1.unit.tests.', }]})) octo_records.append(Record.new(zone, '', { 'ttl': 4, 'type': 'NS', 'values': ['ns1.unit.tests.', 'ns2.unit.tests.']})) octo_records.append(Record.new(zone, 'foo', { 'ttl': 5, 'type': 'NS', 'value': 'ns1.unit.tests.'})) octo_records.append(Record.new(zone, '_srv._tcp', { 'ttl': 6, 'type': 'SRV', 'values': [{ 'priority': 10, 'weight': 20, 'port': 30, 'target': 'foo-1.unit.tests.', }, { 'priority': 12, 'weight': 30, 'port': 30, 'target': 'foo-2.unit.tests.', }]})) octo_records.append(Record.new(zone, '_srv2._tcp', { 'ttl': 7, 'type': 'SRV', 'values': [{ 'priority': 12, 'weight': 17, 'port': 1, 'target': 'srvfoo.unit.tests.', }]})) octo_records.append(Record.new(zone, 'txt1', { 'ttl': 8, 'type': 'TXT', 'value': 'txt singleton test'})) octo_records.append(Record.new(zone, 'txt2', { 'ttl': 9, 'type': 'TXT', 'values': ['txt multiple test', 'txt multiple test 2']})) octo_records.append(Record.new(zone, 'naptr', { 'ttl': 9, 'type': 'NAPTR', 'values': [{ 'order': 100, 'preference': 10, 'flags': 'S', 'service': 'SIP+D2U', 'regexp': "!^.*$!sip:customer-service@unit.tests!", 'replacement': '_sip._udp.unit.tests.' }]})) octo_records.append(Record.new(zone, 'caa', { 'ttl': 9, 'type': 'CAA', 'value': { 'flags': 0, 'tag': 'issue', 'value': 'ca.unit.tests', }})) for record in octo_records: zone.add_record(record) # This is the format which the google API likes. resource_record_sets = [ ('unit.tests.', u'A', 0, [u'1.2.3.4', u'10.10.10.10']), (u'a.unit.tests.', u'A', 1, [u'1.1.1.1', u'1.2.3.4']), (u'aa.unit.tests.', u'A', 9001, [u'1.2.4.3']), (u'aaa.unit.tests.', u'A', 2, [u'1.1.1.3']), (u'cname.unit.tests.', u'CNAME', 3, [u'a.unit.tests.']), (u'mx1.unit.tests.', u'MX', 3, [u'10 mx1.unit.tests.', u'20 mx2.unit.tests.']), (u'mx2.unit.tests.', u'MX', 3, [u'10 mx1.unit.tests.']), ('unit.tests.', u'NS', 4, [u'ns1.unit.tests.', u'ns2.unit.tests.']), (u'foo.unit.tests.', u'NS', 5, [u'ns1.unit.tests.']), (u'_srv._tcp.unit.tests.', u'SRV', 6, [u'10 20 30 foo-1.unit.tests.', u'12 30 30 foo-2.unit.tests.']), (u'_srv2._tcp.unit.tests.', u'SRV', 7, [u'12 17 1 srvfoo.unit.tests.']), (u'txt1.unit.tests.', u'TXT', 8, [u'txt singleton test']), (u'txt2.unit.tests.', u'TXT', 9, [u'txt multiple test', u'txt multiple test 2']), (u'naptr.unit.tests.', u'NAPTR', 9, [ u'100 10 "S" "SIP+D2U" "!^.*$!sip:customer-service@unit.tests!"' u' _sip._udp.unit.tests.']), (u'caa.unit.tests.', u'CAA', 9, [u'0 issue ca.unit.tests']) ] class DummyResourceRecordSet: def __init__(self, record_name, record_type, ttl, rrdatas): self.name = record_name self.record_type = record_type self.ttl = ttl self.rrdatas = rrdatas def __eq__(self, other): try: return self.name == other.name \ and self.record_type == other.record_type \ and self.ttl == other.ttl \ and sorted(self.rrdatas) == sorted(other.rrdatas) except: return False def __repr__(self): return "{} {} {} {!s}"\ .format(self.name, self.record_type, self.ttl, self.rrdatas) def __hash__(self): return hash(repr(self)) class DummyGoogleCloudZone: def __init__(self, dns_name): self.dns_name = dns_name def resource_record_set(self, *args): return DummyResourceRecordSet(*args) def list_resource_record_sets(self, *args): pass class DummyIterator: """Returns a mock DummyIterator object to use in testing. This is because API calls for google cloud DNS, if paged, contains a "next_page_token", which can be used to grab a subsequent iterator with more results. :type return: DummyIterator """ def __init__(self, list_of_stuff, page_token=None): self.iterable = iter(list_of_stuff) self.next_page_token = page_token def __iter__(self): return self def next(self): return self.iterable.next() class TestGoogleCloudRecordSetMaker(TestCase): def test_get_record_set(self): mz = DummyGoogleCloudZone('unit.tests.') record_sets = [] for record in octo_records: mm = _GoogleCloudRecordSetMaker(mz, record) record_sets.append(mm.get_record_set()) self.assertEqual( len(octo_records), len(record_sets)) class TestGoogleCloudProvider(TestCase): @patch('octodns.provider.googlecloud.dns') def _get_provider(*args): '''Returns a mock GoogleCloudProvider object to use in testing. :type return: GoogleCloudProvider ''' return GoogleCloudProvider(id=1, project="mock") @patch('octodns.provider.googlecloud.time.sleep') @patch('octodns.provider.googlecloud.dns') def test___init__(self, *_): self.assertIsInstance(GoogleCloudProvider(id=1, credentials_file="test", project="unit test"), BaseProvider) self.assertIsInstance(GoogleCloudProvider(id=1), BaseProvider) @patch('octodns.provider.googlecloud.time.sleep') @patch('octodns.provider.googlecloud.dns') def test__apply(self, *_): class DummyDesired: def __init__(self, name, changes): self.name = name self.changes = changes apply_z = Zone("unit.tests.", []) create_r = Record.new(apply_z, '', { 'ttl': 0, 'type': 'A', 'values': ['1.2.3.4', '10.10.10.10']}) delete_r = Record.new(apply_z, 'a', { 'ttl': 1, 'type': 'A', 'values': ['1.2.3.4', '1.1.1.1']}) update_existing_r = Record.new(apply_z, 'aa', { 'ttl': 9001, 'type': 'A', 'values': ['1.2.4.3']}) update_new_r = Record.new(apply_z, 'aa', { 'ttl': 666, 'type': 'A', 'values': ['1.4.3.2']}) gcloud_zone_mock = DummyGoogleCloudZone("unit.tests.") status_mock = Mock() return_values_for_status = iter( ['', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', 'done']) type(status_mock).status = PropertyMock( side_effect=return_values_for_status.next) gcloud_zone_mock.changes = Mock(return_value=status_mock) provider = self._get_provider() provider.gcloud_client = Mock() provider._get_gcloud_zone = Mock( return_value=gcloud_zone_mock) desired = Mock() desired.name = Mock(return_value="unit.tests.") changes = [] changes.append(Create(create_r)) changes.append(Delete(delete_r)) changes.append(Update(existing=update_existing_r, new=update_new_r)) provider.apply(Plan( existing=[update_existing_r, delete_r], desired=desired, changes=changes )) calls_mock = gcloud_zone_mock.changes.return_value mocked_calls = [] for mock_call in calls_mock.add_record_set.mock_calls: mocked_calls.append(mock_call[1][0]) self.assertEqual(mocked_calls, [ DummyResourceRecordSet( 'unit.tests.', 'A', 0, ['1.2.3.4', '10.10.10.10']), DummyResourceRecordSet( 'aa.unit.tests.', 'A', 666, ['1.4.3.2']) ]) mocked_calls2 = [] for mock_call in calls_mock.delete_record_set.mock_calls: mocked_calls2.append(mock_call[1][0]) self.assertEqual(mocked_calls2, [ DummyResourceRecordSet( 'a.unit.tests.', 'A', 1, ['1.2.3.4', '1.1.1.1']), DummyResourceRecordSet( 'aa.unit.tests.', 'A', 9001, ['1.2.4.3']) ]) unsupported_change = Mock() unsupported_change.__len__ = Mock(return_value=1) mock_plan = Mock() type(mock_plan).desired = PropertyMock(return_value=DummyDesired( "dummy name", [])) type(mock_plan).changes = [unsupported_change] with self.assertRaises(RuntimeError): provider.apply(mock_plan) def test__record_to_record_set(self): provider = self._get_provider() gcloud_zone = DummyGoogleCloudZone('unit.tests.') for record in octo_records: self.assertIsNotNone(provider._record_to_record_set( gcloud_zone, record)) def test__get_gcloud_client(self): provider = self._get_provider() self.assertIsInstance(provider, GoogleCloudProvider) @patch('octodns.provider.googlecloud.dns') def test_populate(self, _): def _get_mock_zones(page_token=None): if not page_token: return DummyIterator([ DummyGoogleCloudZone('example.com.'), DummyGoogleCloudZone('example2.com.'), ], page_token="DUMMY_PAGE_TOKEN") return DummyIterator([ google_cloud_zone ]) def _get_mock_record_sets(page_token=None): if not page_token: return DummyIterator( [DummyResourceRecordSet(*v) for v in resource_record_sets[:5]], page_token="DUMMY_PAGE_TOKEN") return DummyIterator( [DummyResourceRecordSet(*v) for v in resource_record_sets[5:]]) google_cloud_zone = DummyGoogleCloudZone('unit.tests.') provider = self._get_provider() provider.gcloud_client.list_zones = Mock(side_effect=_get_mock_zones) google_cloud_zone.list_resource_record_sets = Mock( side_effect=_get_mock_record_sets) self.assertEqual(provider._get_gcloud_zone("unit.tests.").dns_name, "unit.tests.") test_zone = Zone('unit.tests.', []) provider.populate(test_zone) # test_zone gets fed the same records as zone does, except it's in # the format returned by google API, so after populate they should look # excactly the same. self.assertEqual(test_zone.records, zone.records) test_zone2 = Zone('nonexistant.zone.', []) provider.populate(test_zone2, False, False) self.assertEqual(len(test_zone2.records), 0, msg="Zone should not get records from wrong domain") provider.SUPPORTS = set() test_zone3 = Zone('unit.tests.', []) provider.populate(test_zone3) self.assertEqual(len(test_zone3.records), 0) @patch('octodns.provider.googlecloud.dns') def test_populate_corner_cases(self, _): provider = self._get_provider() test_zone = Zone('unit.tests.', []) not_same_fqdn = DummyResourceRecordSet( 'unit.tests.gr', u'A', 0, [u'1.2.3.4']), provider._get_gcloud_records = Mock( side_effect=[not_same_fqdn]) provider._get_gcloud_zone = Mock(return_value=DummyGoogleCloudZone( dns_name="unit.tests.")) provider.populate(test_zone) self.assertEqual(len(test_zone.records), 1) self.assertEqual(test_zone.records.pop().fqdn, u'unit.tests.gr.unit.tests.') def test__get_gcloud_zone(self): provider = self._get_provider() provider.gcloud_client = Mock() provider.gcloud_client.list_zones = Mock( return_value=DummyIterator([])) self.assertIsNone(provider._get_gcloud_zone("nonexistant.xone"), msg="Check that nonexistant zones return None when" "there's no create=True flag") def test__create_zone(self): provider = self._get_provider() provider.gcloud_client = Mock() provider.gcloud_client.list_zones = Mock( return_value=DummyIterator([])) mock_zone = provider._get_gcloud_zone( 'nonexistant.zone.mock', create=True) mock_zone.create.assert_called() provider.gcloud_client.zone.assert_called() provider.gcloud_client.zone.assert_called_once_with( dns_name=u'nonexistant.zone.mock', name=u'nonexistant-zone-moc') def test__create_zone_with_numbers_in_name(self): provider = self._get_provider() provider.gcloud_client = Mock() provider.gcloud_client.list_zones = Mock( return_value=DummyIterator([])) provider._get_gcloud_zone( '111.', create=True) provider.gcloud_client.zone.assert_called_once_with( dns_name=u'111.', name=u'a111')