Skip to content

Commit

Permalink
add keycloak validation of authentication groups support
Browse files Browse the repository at this point in the history
  • Loading branch information
iDmple authored and totaam committed Apr 5, 2024
1 parent 6a67128 commit b4ba4ca
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 4 deletions.
71 changes: 69 additions & 2 deletions xpra/server/auth/keycloak.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
KEYCLOAK_CLIENT_ID = os.environ.get("XPRA_KEYCLOAK_CLIENT_ID", "example_client")
KEYCLOAK_CLIENT_SECRET_KEY = os.environ.get("XPRA_KEYCLOAK_CLIENT_SECRET_KEY", "secret")
KEYCLOAK_REDIRECT_URI = os.environ.get("XPRA_KEYCLOAK_REDIRECT_URI", "http://localhost/login/")
KEYCLOAK_CLAIM_FIELD = os.environ.get("XPRA_KEYCLOAK_CLAIM_FIELD", "") # field containing groups claim
KEYCLOAK_AUTH_GROUPS = os.environ.get("XPRA_KEYCLOAK_AUTH_GROUPS", "") # authorized groups
KEYCLOAK_AUTH_CONDITION = os.environ.get("XPRA_KEYCLOAK_AUTH_CONDITION", "and") # authentication condition
KEYCLOAK_SCOPE = os.environ.get("XPRA_KEYCLOAK_SCOPE", "openid")
KEYCLOAK_GRANT_TYPE = os.environ.get("XPRA_KEYCLOAK_GRANT_TYPE", "authorization_code")

Expand All @@ -27,6 +30,9 @@ def __init__(self, **kwargs):
self.client_id = kwargs.pop("client_id", KEYCLOAK_CLIENT_ID)
self.client_secret_key = kwargs.pop("client_secret_key", KEYCLOAK_CLIENT_SECRET_KEY)
self.redirect_uri = kwargs.pop("redirect_uri", KEYCLOAK_REDIRECT_URI)
self.claim_field = kwargs.pop("claim_field", KEYCLOAK_CLAIM_FIELD)
self.auth_groups = set(kwargs.pop("auth_groups", KEYCLOAK_AUTH_GROUPS).split())
self.auth_condition = kwargs.pop("auth_condition", KEYCLOAK_AUTH_CONDITION)
self.scope = kwargs.pop("scope", KEYCLOAK_SCOPE)
self.grant_type = kwargs.pop("grant_type", KEYCLOAK_GRANT_TYPE)

Expand Down Expand Up @@ -158,9 +164,70 @@ def check(self, response_json: bytes) -> bool:
log.error("Error: keycloak authentication failed as token state is invalid")
return False

log("keycloak: token is active")

user_info = keycloak_openid.userinfo(access_token)
log("userinfo_info: %r", user_info)
log("keycloak authentication succeeded: token is active")
log("user_info: %r", user_info)

log("claim_field: %r", self.claim_field)
log("auth_groups: %r", self.auth_groups)

# if claim_field is defined, check that the auth_groups match the groups_claim condition
if self.claim_field:
if not self.auth_groups:
log.error("Error: keycloak authentication failed as auth_groups is invalid")
return False

def extract_groups_claim(data_dict, keys):

"""
Extracts a nested value from a dictionary using a list of keys.
This function is specifically designed to handle nested dictionaries,
such as the ones provided by Keycloak userinfo.
For example, userinfo could contain the following structure:
"resource_access": {
"roles": {
"group": [
"client-group-1",
"client-group-2"
]
}
}
In this scenario, the function traverses the dictionary using the keys
['resource_access', 'roles', 'group'] to extract the list of groups
['client-group-1', 'client-group-2'].
Args:
data_dict (dict): The dictionary from which to extract the nested value.
keys (list): A list of keys that represents the path to the desired value
in the nested dictionary.
Returns:
The nested value extracted from the dictionary. If the sequence of keys is
exhausted, it returns the portion of the dictionary that it has navigated to.
"""
return extract_groups_claim(data_dict[keys.pop(0)], keys) if len(keys) else data_dict

groups_claim = set(extract_groups_claim(user_info, self.claim_field.split('.')))
log("groups_claim: %r", groups_claim)

if self.auth_condition == "or":
if not self.auth_groups.intersection(groups_claim):
log.error("Error: keycloak authentication failed as groups claim is not satisfied")
return False
elif self.auth_condition == "and":
if not self.auth_groups.issubset(groups_claim):
log.error("Error: keycloak authentication failed as groups claim is not satisfied")
return False
else:
log.error("Error: keycloak authentication failed as auth_condition is invalid")
return False

log("keycloak authentication succeeded")
return True
except KeycloakError as e:
log.error("Error: keycloak authentication failed")
Expand Down
3 changes: 1 addition & 2 deletions xpra/server/auth/sys_auth_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,7 @@ def unxor_response(self, caps: typedict) -> bytes:
return b""
salt = self.get_response_salt(client_salt)
value = gendigest("xor", challenge_response, salt)
log(f"unxor_response(..) challenge-response=%s, client-salt={client_salt!r}, response salt={salt!r}",
obsc(repr(challenge_response)))
log(f"unxor_response(..) challenge-response={obsc(repr(challenge_response))}, client-salt={client_salt!r}, response salt={salt!r}")
return value

def default_authenticate_check(self, caps: typedict) -> bool:
Expand Down

0 comments on commit b4ba4ca

Please sign in to comment.