Skip to content

Commit dc4cce1

Browse files
Merge pull request #89 from leifj/attribute-authz
support for attribute-based authorization
2 parents 0e5afd4 + cf2bafc commit dc4cce1

File tree

3 files changed

+183
-0
lines changed

3 files changed

+183
-0
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
module: satosa.micro_services.attribute_authorization.AttributeAuthorization
2+
name: AttributeAuthorization
3+
config:
4+
attribute_allow:
5+
target_provider1:
6+
requester1:
7+
attr1:
8+
- "^foo:bar$"
9+
- "^kaka$"
10+
default:
11+
attr1:
12+
- "plupp@.+$"
13+
"":
14+
"":
15+
attr2:
16+
- "^knytte:.*$"
17+
attribute_deny:
18+
default:
19+
default:
20+
eppn:
21+
- "^[^@]+$"
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import re
2+
3+
from .base import ResponseMicroService
4+
from ..exception import SATOSAAuthenticationError
5+
from ..util import get_dict_defaults
6+
7+
class AttributeAuthorization(ResponseMicroService):
8+
9+
"""
10+
A microservice that performs simple regexp-based authorization based on response
11+
attributes. The configuration assumes a dict with two keys: attributes_allow
12+
and attributes_deny. An examples speaks volumes:
13+
14+
```yaml
15+
config:
16+
attribute_allow:
17+
target_provider1:
18+
requester1:
19+
attr1:
20+
- "^foo:bar$"
21+
- "^kaka$"
22+
default:
23+
attr1:
24+
- "plupp@.+$"
25+
"":
26+
"":
27+
attr2:
28+
- "^knytte:.*$"
29+
attribute_deny:
30+
default:
31+
default:
32+
eppn:
33+
- "^[^@]+$"
34+
35+
```
36+
37+
The use of "" and 'default' is synonymous. Attribute rules are not overloaded
38+
or inherited. For instance a response from "provider2" would only be allowed
39+
through if the eppn attribute had all values containing an '@' (something
40+
perhaps best implemented via an allow rule in practice). Responses from
41+
target_provider1 bound for requester1 would be allowed through only if attr1
42+
contained foo:bar or kaka. Note that attribute filters (the leaves of the
43+
structure above) are ORed together - i.e any attribute match is sufficient.
44+
"""
45+
46+
def __init__(self, config, *args, **kwargs):
47+
super().__init__(*args, **kwargs)
48+
self.attribute_allow = config.get("attribute_allow", {})
49+
self.attribute_deny = config.get("attribute_deny", {})
50+
51+
def _check_authz(self, context, attributes, requester, provider):
52+
for attribute_name, attribute_filters in get_dict_defaults(self.attribute_allow, requester, provider).items():
53+
if attribute_name in attributes:
54+
if not any([any(filter(re.compile(af).search, attributes[attribute_name])) for af in attribute_filters]):
55+
raise SATOSAAuthenticationError(context.state, "Permission denied")
56+
57+
for attribute_name, attribute_filters in get_dict_defaults(self.attribute_deny, requester, provider).items():
58+
if attribute_name in attributes:
59+
if any([any(filter(re.compile(af).search, attributes[attribute_name])) for af in attribute_filters]):
60+
raise SATOSAAuthenticationError(context.state, "Permission denied")
61+
62+
def process(self, context, data):
63+
self._check_authz(context, data.attributes, data.requester, data.auth_info.issuer)
64+
return super().process(context, data)
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
from satosa.internal_data import InternalResponse, AuthenticationInformation
2+
from satosa.micro_services.attribute_authorization import AttributeAuthorization
3+
from satosa.exception import SATOSAAuthenticationError
4+
from satosa.context import Context
5+
6+
class TestAttributeAuthorization:
7+
def create_authz_service(self, attribute_allow, attribute_deny):
8+
authz_service = AttributeAuthorization(config=dict(attribute_allow=attribute_allow,attribute_deny=attribute_deny), name="test_authz",
9+
base_url="https://satosa.example.com")
10+
authz_service.next = lambda ctx, data: data
11+
return authz_service
12+
13+
def test_authz_allow_success(self):
14+
attribute_allow = {
15+
"": { "default": {"a0": ['.+@.+']} }
16+
}
17+
attribute_deny = {}
18+
authz_service = self.create_authz_service(attribute_allow, attribute_deny)
19+
resp = InternalResponse(AuthenticationInformation(None, None, None))
20+
resp.attributes = {
21+
"a0": ["test@example.com"],
22+
}
23+
try:
24+
ctx = Context()
25+
ctx.state = dict()
26+
authz_service.process(ctx, resp)
27+
except SATOSAAuthenticationError as ex:
28+
assert False
29+
30+
def test_authz_allow_fail(self):
31+
attribute_allow = {
32+
"": { "default": {"a0": ['foo1','foo2']} }
33+
}
34+
attribute_deny = {}
35+
authz_service = self.create_authz_service(attribute_allow, attribute_deny)
36+
resp = InternalResponse(AuthenticationInformation(None, None, None))
37+
resp.attributes = {
38+
"a0": ["bar"],
39+
}
40+
try:
41+
ctx = Context()
42+
ctx.state = dict()
43+
authz_service.process(ctx, resp)
44+
assert False
45+
except SATOSAAuthenticationError as ex:
46+
assert True
47+
48+
def test_authz_allow_second(self):
49+
attribute_allow = {
50+
"": { "default": {"a0": ['foo1','foo2']} }
51+
}
52+
attribute_deny = {}
53+
authz_service = self.create_authz_service(attribute_allow, attribute_deny)
54+
resp = InternalResponse(AuthenticationInformation(None, None, None))
55+
resp.attributes = {
56+
"a0": ["foo2","kaka"],
57+
}
58+
try:
59+
ctx = Context()
60+
ctx.state = dict()
61+
authz_service.process(ctx, resp)
62+
except SATOSAAuthenticationError as ex:
63+
assert False
64+
65+
def test_authz_deny_success(self):
66+
attribute_deny = {
67+
"": { "default": {"a0": ['foo1','foo2']} }
68+
}
69+
attribute_allow = {}
70+
authz_service = self.create_authz_service(attribute_allow, attribute_deny)
71+
resp = InternalResponse(AuthenticationInformation(None, None, None))
72+
resp.attributes = {
73+
"a0": ["foo2"],
74+
}
75+
try:
76+
ctx = Context()
77+
ctx.state = dict()
78+
authz_service.process(ctx, resp)
79+
assert False
80+
except SATOSAAuthenticationError as ex:
81+
assert True
82+
83+
def test_authz_deny_fail(self):
84+
attribute_deny = {
85+
"": { "default": {"a0": ['foo1','foo2']} }
86+
}
87+
attribute_allow = {}
88+
authz_service = self.create_authz_service(attribute_allow, attribute_deny)
89+
resp = InternalResponse(AuthenticationInformation(None, None, None))
90+
resp.attributes = {
91+
"a0": ["foo3"],
92+
}
93+
try:
94+
ctx = Context()
95+
ctx.state = dict()
96+
authz_service.process(ctx, resp)
97+
except SATOSAAuthenticationError as ex:
98+
assert False

0 commit comments

Comments
 (0)