Browse Source

Merge pull request #1253 from octodns/escaped-semi

Add unescaped semicolons support to YamlProvider
pull/1277/head
Ross McFarland 5 months ago
committed by GitHub
parent
commit
5638d40412
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
8 changed files with 110 additions and 2 deletions
  1. +4
    -0
      .changelog/7a30ee9fb9dd433c9181ddd5628e5cd3.md
  2. +4
    -0
      .changelog/85710a9264524662becdc7e52e71e241.md
  3. +24
    -1
      octodns/provider/yaml.py
  4. +3
    -1
      octodns/record/chunked.py
  5. +9
    -0
      tests/config-semis/escaped.semis.yaml
  6. +9
    -0
      tests/config-semis/unescaped.semis.yaml
  7. +51
    -0
      tests/test_octodns_provider_yaml.py
  8. +6
    -0
      tests/test_octodns_record_chunked.py

+ 4
- 0
.changelog/7a30ee9fb9dd433c9181ddd5628e5cd3.md View File

@ -0,0 +1,4 @@
---
type: minor
---
Add unescaped semicolons support to YamlProvider

+ 4
- 0
.changelog/85710a9264524662becdc7e52e71e241.md View File

@ -0,0 +1,4 @@
---
type: minor
---
Add validation to TXT records to check for double escaped semi-colons

+ 24
- 1
octodns/provider/yaml.py View File

@ -73,6 +73,10 @@ class YamlProvider(BaseProvider):
# (optional, default False) # (optional, default False)
disable_zonefile: false disable_zonefile: false
# Whether or not ; in values, e.g. TXT, need to be escaped \\;
# (optional, default True)
escaped_semicolons: True
Note Note
---- ----
@ -192,13 +196,14 @@ class YamlProvider(BaseProvider):
split_catchall=True, split_catchall=True,
shared_filename=False, shared_filename=False,
disable_zonefile=False, disable_zonefile=False,
escaped_semicolons=True,
*args, *args,
**kwargs, **kwargs,
): ):
klass = self.__class__.__name__ klass = self.__class__.__name__
self.log = logging.getLogger(f'{klass}[{id}]') self.log = logging.getLogger(f'{klass}[{id}]')
self.log.debug( self.log.debug(
'__init__: id=%s, directory=%s, default_ttl=%d, enforce_order=%d, order_mode=%s, populate_should_replace=%s, supports_root_ns=%s, split_extension=%s, split_catchall=%s, shared_filename=%s, disable_zonefile=%s',
'__init__: id=%s, directory=%s, default_ttl=%d, enforce_order=%d, order_mode=%s, populate_should_replace=%s, supports_root_ns=%s, split_extension=%s, split_catchall=%s, shared_filename=%s, disable_zonefile=%s, escaped_semicolons=%s',
id, id,
directory, directory,
default_ttl, default_ttl,
@ -210,6 +215,7 @@ class YamlProvider(BaseProvider):
split_catchall, split_catchall,
shared_filename, shared_filename,
disable_zonefile, disable_zonefile,
escaped_semicolons,
) )
super().__init__(id, *args, **kwargs) super().__init__(id, *args, **kwargs)
self.directory = directory self.directory = directory
@ -222,6 +228,7 @@ class YamlProvider(BaseProvider):
self.split_catchall = split_catchall self.split_catchall = split_catchall
self.shared_filename = shared_filename self.shared_filename = shared_filename
self.disable_zonefile = disable_zonefile self.disable_zonefile = disable_zonefile
self.escaped_semicolons = escaped_semicolons
def copy(self): def copy(self):
kwargs = dict(self.__dict__) kwargs = dict(self.__dict__)
@ -326,6 +333,17 @@ class YamlProvider(BaseProvider):
if not isinstance(data, list): if not isinstance(data, list):
data = [data] data = [data]
for d in data: for d in data:
_type = d.get('type')
if not self.escaped_semicolons and _type in (
'SPF',
'TXT',
):
if 'value' in d:
d['value'] = d['value'].replace(';', '\\;')
if 'values' in d:
d['values'] = [
v.replace(';', '\\;') for v in d['values']
]
if 'ttl' not in d: if 'ttl' not in d:
d['ttl'] = self.default_ttl d['ttl'] = self.default_ttl
record = Record.new( record = Record.new(
@ -402,6 +420,11 @@ class YamlProvider(BaseProvider):
if record.ttl == self.default_ttl: if record.ttl == self.default_ttl:
# ttl is the default, we don't need to store it # ttl is the default, we don't need to store it
del d['ttl'] del d['ttl']
if not self.escaped_semicolons and record._type in ('SPF', 'TXT'):
if 'value' in d:
d['value'] = d['value'].replace('\\;', ';')
if 'values' in d:
d['values'] = [v.replace('\\;', ';') for v in d['values']]
# we want to output the utf-8 version of the name # we want to output the utf-8 version of the name
data[record.decoded_name].append(d) data[record.decoded_name].append(d)


+ 3
- 1
octodns/record/chunked.py View File

@ -9,7 +9,6 @@ from .base import ValuesMixin
class _ChunkedValuesMixin(ValuesMixin): class _ChunkedValuesMixin(ValuesMixin):
CHUNK_SIZE = 255 CHUNK_SIZE = 255
_unescaped_semicolon_re = re.compile(r'\w;')
def chunked_value(self, value): def chunked_value(self, value):
value = value.replace('"', '\\"') value = value.replace('"', '\\"')
@ -44,6 +43,7 @@ class _ChunkedValuesMixin(ValuesMixin):
class _ChunkedValue(str): class _ChunkedValue(str):
_unescaped_semicolon_re = re.compile(r'\w;') _unescaped_semicolon_re = re.compile(r'\w;')
_double_escaped_semicolon_re = re.compile(r'\\\\;')
@classmethod @classmethod
def parse_rdata_text(cls, value): def parse_rdata_text(cls, value):
@ -62,6 +62,8 @@ class _ChunkedValue(str):
for value in data: for value in data:
if cls._unescaped_semicolon_re.search(value): if cls._unescaped_semicolon_re.search(value):
reasons.append(f'unescaped ; in "{value}"') reasons.append(f'unescaped ; in "{value}"')
if cls._double_escaped_semicolon_re.search(value):
reasons.append(f'double escaped ; in "{value}"')
try: try:
value.encode('ascii') value.encode('ascii')
except UnicodeEncodeError: except UnicodeEncodeError:


+ 9
- 0
tests/config-semis/escaped.semis.yaml View File

@ -0,0 +1,9 @@
---
one:
type: TXT
value: This has a semi-colon\; that is escaped.
two:
type: TXT
values:
- This has a semi-colon too\; that is escaped.
- \;

+ 9
- 0
tests/config-semis/unescaped.semis.yaml View File

@ -0,0 +1,9 @@
---
one:
type: TXT
value: This has a semi-colon; that isn't escaped.
two:
type: TXT
values:
- This has a semi-colon too; that isn't escaped.
- ;

+ 51
- 0
tests/test_octodns_provider_yaml.py View File

@ -15,6 +15,7 @@ from octodns.idna import idna_encode
from octodns.provider import ProviderException from octodns.provider import ProviderException
from octodns.provider.yaml import SplitYamlProvider, YamlProvider from octodns.provider.yaml import SplitYamlProvider, YamlProvider
from octodns.record import Create, NsValue, Record, ValuesMixin from octodns.record import Create, NsValue, Record, ValuesMixin
from octodns.record.exception import ValidationError
from octodns.zone import SubzoneRecordException, Zone from octodns.zone import SubzoneRecordException, Zone
@ -471,6 +472,56 @@ xn--dj-kia8a:
# make sure that we get the idna one back # make sure that we get the idna one back
self.assertEqual(idna, provider._zone_sources(zone)) self.assertEqual(idna, provider._zone_sources(zone))
def test_unescaped_semicolons(self):
source = YamlProvider(
'test',
join(dirname(__file__), 'config-semis'),
escaped_semicolons=False,
)
zone = Zone('unescaped.semis.', [])
source.populate(zone)
self.assertEqual(2, len(zone.records))
one = next(r for r in zone.records if r.name == 'one')
self.assertTrue(one)
self.assertEqual(
["This has a semi-colon\\; that isn't escaped."], one.values
)
two = next(r for r in zone.records if r.name == 'two')
self.assertTrue(two)
self.assertEqual(
["This has a semi-colon too\\; that isn't escaped.", '\\;'],
two.values,
)
escaped = Zone('escaped.semis.', [])
with self.assertRaises(ValidationError) as ctx:
source.populate(escaped)
self.assertEqual(
[
'double escaped ; in "This has a semi-colon\\\\; that is escaped."'
],
ctx.exception.reasons,
)
with TemporaryDirectory() as td:
# Add some subdirs to make sure that it can create them
target = YamlProvider(
'target', td.dirname, escaped_semicolons=False
)
yaml_file = join(td.dirname, 'unescaped.semis.yaml')
plan = target.plan(zone)
target.apply(plan)
with open(yaml_file) as fh:
content = fh.read()
self.assertTrue('value: This has a semi-colon; that' in content)
self.assertTrue(
"- This has a semi-colon too; that isn't escaped." in content
)
self.assertTrue('- ;' in content)
class TestSplitYamlProvider(TestCase): class TestSplitYamlProvider(TestCase):
def test_list_all_yaml_files(self): def test_list_all_yaml_files(self):


+ 6
- 0
tests/test_octodns_record_chunked.py View File

@ -59,6 +59,12 @@ class TestChunkedValue(TestCase):
_ChunkedValue.validate('hello; world', 'TXT'), _ChunkedValue.validate('hello; world', 'TXT'),
) )
# double escaped ;
self.assertEqual(
['double escaped ; in "hello\\\\; world"'],
_ChunkedValue.validate('hello\\\\; world', 'TXT'),
)
# non-asci # non-asci
self.assertEqual( self.assertEqual(
['non ASCII character in "v=spf1 –all"'], ['non ASCII character in "v=spf1 –all"'],


Loading…
Cancel
Save