diff --git a/octodns/manager.py b/octodns/manager.py index e6fe253..8439eb6 100644 --- a/octodns/manager.py +++ b/octodns/manager.py @@ -6,7 +6,7 @@ from __future__ import absolute_import, division, print_function, \ unicode_literals from StringIO import StringIO -from concurrent.futures import Future, ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor from importlib import import_module from os import environ import logging @@ -38,6 +38,17 @@ class _AggregateTarget(object): return True +class MakeThreadFuture(object): + + def __init__(self, func, args, kwargs): + self.func = func + self.args = args + self.kwargs = kwargs + + def result(self): + return self.func(*self.args, **self.kwargs) + + class MainThreadExecutor(object): ''' Dummy executor that runs things on the main thread during the involcation @@ -48,13 +59,7 @@ class MainThreadExecutor(object): ''' def submit(self, func, *args, **kwargs): - future = Future() - try: - future.set_result(func(*args, **kwargs)) - except Exception as e: - # TODO: get right stacktrace here - future.set_exception(e) - return future + return MakeThreadFuture(func, args, kwargs) class Manager(object): diff --git a/octodns/provider/ns1.py b/octodns/provider/ns1.py index 2f0a024..65db64c 100644 --- a/octodns/provider/ns1.py +++ b/octodns/provider/ns1.py @@ -7,7 +7,8 @@ from __future__ import absolute_import, division, print_function, \ from logging import getLogger from nsone import NSONE -from nsone.rest.errors import ResourceException +from nsone.rest.errors import RateLimitException, ResourceException +from time import sleep from ..record import Record from .base import BaseProvider @@ -171,7 +172,14 @@ class Ns1Provider(BaseProvider): name = self._get_name(new) _type = new._type params = getattr(self, '_params_for_{}'.format(_type))(new) - getattr(nsone_zone, 'add_{}'.format(_type))(name, **params) + meth = getattr(nsone_zone, 'add_{}'.format(_type)) + try: + meth(name, **params) + except RateLimitException as e: + self.log.warn('_apply_Create: rate limit encountered, pausing ' + 'for %ds and trying again', e.period) + sleep(e.period) + meth(name, **params) def _apply_Update(self, nsone_zone, change): existing = change.existing @@ -180,14 +188,26 @@ class Ns1Provider(BaseProvider): record = nsone_zone.loadRecord(name, _type) new = change.new params = getattr(self, '_params_for_{}'.format(_type))(new) - record.update(**params) + try: + record.update(**params) + except RateLimitException as e: + self.log.warn('_apply_Update: rate limit encountered, pausing ' + 'for %ds and trying again', e.period) + sleep(e.period) + record.update(**params) def _apply_Delete(self, nsone_zone, change): existing = change.existing name = self._get_name(existing) _type = existing._type record = nsone_zone.loadRecord(name, _type) - record.delete() + try: + record.delete() + except RateLimitException as e: + self.log.warn('_apply_Delete: rate limit encountered, pausing ' + 'for %ds and trying again', e.period) + sleep(e.period) + record.delete() def _apply(self, plan): desired = plan.desired diff --git a/requirements.txt b/requirements.txt index 62c485a..9eb8284 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,7 +11,7 @@ incf.countryutils==1.0 ipaddress==1.0.18 jmespath==0.9.0 natsort==5.0.3 -nsone==0.9.10 +nsone==0.9.14 python-dateutil==2.6.0 requests==2.13.0 s3transfer==0.1.10 diff --git a/tests/test_octodns_provider_ns1.py b/tests/test_octodns_provider_ns1.py index ecc107c..0398459 100644 --- a/tests/test_octodns_provider_ns1.py +++ b/tests/test_octodns_provider_ns1.py @@ -6,7 +6,8 @@ from __future__ import absolute_import, division, print_function, \ unicode_literals from mock import Mock, call, patch -from nsone.rest.errors import AuthException, ResourceException +from nsone.rest.errors import AuthException, RateLimitException, \ + ResourceException from unittest import TestCase from octodns.record import Delete, Record, Update @@ -225,7 +226,15 @@ class TestNs1Provider(TestCase): create_mock.reset_mock() load_mock.side_effect = \ ResourceException('server error: zone not found') - create_mock.side_effect = None + # ugh, need a mock zone with a mock prop since we're using getattr, we + # can actually control side effects on `meth` with that. + mock_zone = Mock() + mock_zone.add_SRV = Mock() + mock_zone.add_SRV.side_effect = [ + RateLimitException('boo', period=0), + None, + ] + create_mock.side_effect = [mock_zone] got_n = provider.apply(plan) self.assertEquals(expected_n, got_n) @@ -245,12 +254,26 @@ class TestNs1Provider(TestCase): self.assertEquals(2, len(plan.changes)) self.assertIsInstance(plan.changes[0], Update) self.assertIsInstance(plan.changes[1], Delete) - + # ugh, we need a mock record that can be returned from loadRecord for + # the update and delete targets, we can add our side effects to that to + # trigger rate limit handling + mock_record = Mock() + mock_record.update.side_effect = [ + RateLimitException('one', period=0), + None, + ] + mock_record.delete.side_effect = [ + RateLimitException('two', period=0), + None, + ] + nsone_zone.loadRecord.side_effect = [mock_record, mock_record] got_n = provider.apply(plan) self.assertEquals(2, got_n) nsone_zone.loadRecord.assert_has_calls([ call('unit.tests', u'A'), - call().update(answers=[u'1.2.3.4'], ttl=32), call('delete-me', u'A'), - call().delete() + ]) + mock_record.assert_has_calls([ + call.update(answers=[u'1.2.3.4'], ttl=32), + call.delete() ])