Skip to content

Commit

Permalink
Merge pull request #126 from akquinet/regex-records-permission
Browse files Browse the repository at this point in the history
allow records with regex
  • Loading branch information
rwxd authored Nov 28, 2024
2 parents 83ed0b8 + c6e55d4 commit 8bf0201
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 8 deletions.
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,22 @@ environments:
- "test.example.com"
```

###### Regex

Additionally to the `records` list a `regex_records` list can be defined.
In this list regex can be to define, which records are allowed.

```yaml
...
environments:
- name: "Test1"
...
zones:
- name: "example.com"
regex_records:
- "_acme-challenge.service-.*.example.com"
```

##### Services

Under a `zone` `services` can be defined.
Expand Down
10 changes: 9 additions & 1 deletion powerdns_api_proxy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
RRSETRequest,
ZoneNotAllowedException,
)
from powerdns_api_proxy.utils import check_zones_equal
from powerdns_api_proxy.utils import check_record_in_regex, check_zones_equal


@lru_cache(maxsize=1)
Expand Down Expand Up @@ -147,10 +147,18 @@ def check_rrset_allowed(zone: ProxyConfigZone, rrset: RRSET) -> bool:
if zone.all_records:
return True

if not rrset['name'].rstrip('.').endswith(zone.name.rstrip('.')):
logger.debug('RRSET not allowed, because zone does not match')
return False

for record in zone.records:
if check_zones_equal(rrset['name'], record):
return True

for regex in zone.regex_records:
if check_record_in_regex(rrset['name'], regex):
return True

if check_acme_record_allowed(zone, rrset):
return True

Expand Down
6 changes: 5 additions & 1 deletion powerdns_api_proxy/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ class ProxyConfigServices(BaseModel):
class ProxyConfigZone(BaseModel):
'''
`name` is the zone name.
`description` is a description of the zone.
`regex` should be set to `True` if `name` is a regex.
`records` is a list of record names that are allowed.
`regex_records` is a list of record regexes that are allowed.
`admin` enabled creating and deleting the zone.
`subzones` sets the same permissions on all subzones.
`all_records` will be set to `True` if no `records` are defined.
Expand All @@ -30,6 +33,7 @@ class ProxyConfigZone(BaseModel):
regex: bool = False
description: str = ''
records: list[str] = []
regex_records: list[str] = []
services: ProxyConfigServices = ProxyConfigServices(acme=False)
admin: bool = False
subzones: bool = False
Expand All @@ -38,7 +42,7 @@ class ProxyConfigZone(BaseModel):

def __init__(self, **data):
super().__init__(**data)
if len(self.records) == 0:
if len(self.records) == 0 and len(self.regex_records) == 0:
logger.debug(
f'Setting all_records to True for zone {self.name}, because no records are defined'
)
Expand Down
5 changes: 5 additions & 0 deletions powerdns_api_proxy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ def check_zone_in_regex(zone: str, regex: str) -> bool:
return re.match(regex, zone.rstrip('.')) is not None


def check_record_in_regex(record: str, regex: str) -> bool:
'''Checks if record is in regex'''
return re.match(regex, record.rstrip('.')) is not None


def check_zones_equal(zone1: str, zone2: str) -> bool:
'''Checks if zones equal with or without trailing dot'''
return zone1.rstrip('.') == zone2.rstrip('.')
125 changes: 119 additions & 6 deletions tests/unit/config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,9 @@ def test_check_rrset_not_allowed_single_entries():
],
)
for item in [
'entry1.test-zone.example.com.',
'entry2.entry1.test-zone.example.com',
'test-zone.example.com.',
'entry100.test-zone.example.com.',
'entry200.entry1.test-zone.example.com',
'test-record.example.com.',
]:
rrset: RRSET = {
'name': item,
Expand All @@ -293,7 +293,7 @@ def test_check_rrset_not_allowed_single_entries():
'records': [],
'comments': [],
}
assert check_rrset_allowed(zone, rrset)
assert not check_rrset_allowed(zone, rrset)


def test_check_rrsets_request_allowed_no_raise():
Expand Down Expand Up @@ -348,8 +348,8 @@ def test_check_rrsets_request_allowed_raise():
)
with pytest.raises(HTTPException) as err:
ensure_rrsets_request_allowed(zone, request)
assert err.value.status_code == 403
assert err.value.detail == 'RRSET entry1.test-zone.example.com. not allowed'
assert err.value.status_code == 403
assert err.value.detail == 'RRSET entry1.test-zone.example.com. not allowed'


def test_check_rrsets_request_not_allowed_read_only():
Expand Down Expand Up @@ -378,6 +378,119 @@ def test_check_rrsets_request_not_allowed_read_only():
assert err.value.detail == 'RRSET update not allowed with read only token'


def test_rrset_request_not_allowed_regex_empty():
zone = ProxyConfigZone(
name='test-zone.example.com.',
regex_records=[],
)
request: RRSETRequest = {'rrsets': []}
assert ensure_rrsets_request_allowed(zone, request)


def test_rrset_request_allowed_all_regex():
zone = ProxyConfigZone(
name='test-zone.example.com.',
regex_records=[
'.*',
],
)
request: RRSETRequest = {'rrsets': []}
for item in [
'entry1.test-zone.example.com.',
'entry2.entry1.test-zone.example.com',
]:
request['rrsets'].append(
{
'name': item,
'type': 'TXT',
'changetype': 'REPLACE',
'ttl': 3600,
'records': [],
'comments': [],
}
)
assert ensure_rrsets_request_allowed(zone, request)


def test_rrset_request_allowed_acme_regex():
zone = ProxyConfigZone(
name='test-zone.example.com.',
regex_records=[
'_acme-challenge.example.*.test-zone.example.com',
],
)
request: RRSETRequest = {'rrsets': []}
for item in [
'_acme-challenge.example-entry.test-zone.example.com.',
]:
request['rrsets'].append(
{
'name': item,
'type': 'TXT',
'changetype': 'REPLACE',
'ttl': 3600,
'records': [],
'comments': [],
}
)
assert ensure_rrsets_request_allowed(zone, request)


def test_rrset_request_not_allowed_false_regex():
zone = ProxyConfigZone(
name='test-zone.example.com.',
regex_records=[
'example.*.test-zone.example.com',
],
)
request: RRSETRequest = {'rrsets': []}
for item in [
'entry1.test-zone.example.com.',
'entry2.entry1.test-zone.example.com',
]:
request['rrsets'].append(
{
'name': item,
'type': 'TXT',
'changetype': 'REPLACE',
'ttl': 3600,
'records': [],
'comments': [],
}
)
with pytest.raises(HTTPException) as err:
ensure_rrsets_request_allowed(zone, request)
assert err.value.status_code == 403
assert err.value.detail == 'RRSET entry1.test-zone.example.com. not allowed'


def test_rrset_request_not_allowed_false_zone():
zone = ProxyConfigZone(
name='test-zone.example.com.',
regex_records=[
'example.*.test-zone2.example.com',
],
)
request: RRSETRequest = {'rrsets': []}
for item in [
'example1.test-zone2.example.com.',
]:
request['rrsets'].append(
{
'name': item,
'type': 'TXT',
'changetype': 'REPLACE',
'ttl': 3600,
'records': [],
'comments': [],
}
)
with pytest.raises(HTTPException) as err:
ensure_rrsets_request_allowed(zone, request)
assert err.value.status_code == 403
assert err.value.detail == 'RRSET example1.test-zone2.example.com. not allowed'


def test_check_acme_record_allowed_all_records():
zone = ProxyConfigZone(name='test-zone.example.com', all_records=True)
rrset = RRSET(
Expand Down

0 comments on commit 8bf0201

Please sign in to comment.