1313from pydantic import BaseModel
1414from requests import Response
1515
16- from fastapi_keycloak .exceptions import KeycloakError , MandatoryActionException , UpdateUserLocaleException , ConfigureTOTPException , VerifyEmailException , \
17- UpdateProfileException , UpdatePasswordException
18- from fastapi_keycloak .model import HTTPMethod , KeycloakUser , OIDCUser , KeycloakToken , KeycloakRole , KeycloakIdentityProvider , KeycloakGroup
16+ from fastapi_keycloak .exceptions import (
17+ KeycloakError , MandatoryActionException , UpdateUserLocaleException ,
18+ ConfigureTOTPException , VerifyEmailException ,
19+ UpdateProfileException , UpdatePasswordException )
20+ from fastapi_keycloak .model import (
21+ HTTPMethod , KeycloakUser , OIDCUser , KeycloakToken , KeycloakRole ,
22+ KeycloakIdentityProvider , KeycloakGroup
23+ )
1924
2025
2126def result_or_error (response_model : Type [BaseModel ] = None , is_list : bool = False ) -> List [BaseModel ] or BaseModel or KeycloakError :
@@ -44,10 +49,7 @@ def inner(f):
4449 def wrapper (* args , ** kwargs ):
4550
4651 def create_list (json : List [dict ]):
47- items = list ()
48- for entry in json :
49- items .append (response_model .parse_obj (entry ))
50- return items
52+ return [response_model .parse_obj (entry ) for entry in json ]
5153
5254 def create_object (json : dict ):
5355 return response_model .parse_obj (json )
@@ -85,8 +87,8 @@ def create_object(json: dict):
8587class FastAPIKeycloak :
8688 """ Instance to wrap the Keycloak API with FastAPI
8789
88- Attributes:
89- _admin_token (KeycloakToken): A KeycloakToken instance, containing the access token that is used for any admin related request
90+ Attributes: _admin_token (KeycloakToken): A KeycloakToken instance, containing the access token that is used for
91+ any admin related request
9092
9193 Example:
9294 ```python
@@ -138,9 +140,8 @@ def admin_token(self):
138140 """
139141 if self .token_is_valid (token = self ._admin_token ):
140142 return self ._admin_token
141- else :
142- self ._get_admin_token ()
143- return self .admin_token
143+ self ._get_admin_token ()
144+ return self .admin_token
144145
145146 @admin_token .setter
146147 def admin_token (self , value : str ):
@@ -185,7 +186,8 @@ def user_auth_scheme(self) -> OAuth2PasswordBearer:
185186 return OAuth2PasswordBearer (tokenUrl = self .token_uri )
186187
187188 def get_current_user (self , required_roles : List [str ] = None ) -> OIDCUser :
188- """ Returns the current user based on an access token in the HTTP-header. Optionally verifies roles are possessed by the user
189+ """ Returns the current user based on an access token in the HTTP-header. Optionally verifies roles are possessed
190+ by the user
189191
190192 Args:
191193 required_roles List[str]: List of role names required for this endpoint
@@ -236,7 +238,8 @@ def open_id_configuration(self) -> dict:
236238 return response .json ()
237239
238240 def proxy (self , relative_path : str , method : HTTPMethod , additional_headers : dict = None , payload : dict = None ) -> Response :
239- """ Proxies a request to Keycloak and automatically adds the required Authorization header. Should not be exposed under any circumstances. Grants full API admin access.
241+ """ Proxies a request to Keycloak and automatically adds the required Authorization header. Should not be
242+ exposed under any circumstances. Grants full API admin access.
240243
241244 Args:
242245
@@ -285,10 +288,17 @@ def _get_admin_token(self) -> None:
285288 response = requests .post (url = self .token_uri , headers = headers , data = data )
286289 try :
287290 self .admin_token = response .json ()['access_token' ]
288- except JSONDecodeError :
289- raise KeycloakError (reason = response .content .decode ('utf-8' ), status_code = response .status_code )
290- except KeyError :
291- raise KeycloakError (reason = f"The response did not contain an access_token: { response .json ()} " , status_code = 403 )
291+ except JSONDecodeError as e :
292+ raise KeycloakError (
293+ reason = response .content .decode ('utf-8' ),
294+ status_code = response .status_code ,
295+ ) from e
296+
297+ except KeyError as e :
298+ raise KeycloakError (
299+ reason = f"The response did not contain an access_token: { response .json ()} " ,
300+ status_code = 403 ,
301+ ) from e
292302
293303 @functools .cached_property
294304 def public_key (self ) -> str :
@@ -471,11 +481,10 @@ def get_subgroups(self, group: KeycloakGroup, path: str):
471481 return subgroup
472482 elif subgroup .subGroups :
473483 for subgroup in group .subGroups :
474- subgroups = self .get_subgroups (subgroup , path )
475- if subgroups :
484+ if subgroups := self .get_subgroups (subgroup , path ):
476485 return subgroups
477486 # Went through the tree without hits
478- return None
487+ return None
479488
480489 @result_or_error (response_model = KeycloakGroup )
481490 def get_group_by_path (self , path : str , search_in_subgroups = True ) -> KeycloakGroup or None :
@@ -502,8 +511,7 @@ def get_group_by_path(self, path: str, search_in_subgroups=True) -> KeycloakGrou
502511 return group
503512 res = self .get_subgroups (group , path )
504513 if res is not None :
505- return res
506- return None
514+ return res
507515
508516 @result_or_error (response_model = KeycloakGroup )
509517 def get_group (self , group_id : str ) -> KeycloakGroup or None :
@@ -669,13 +677,12 @@ def create_user(
669677 "requiredActions" : ["VERIFY_EMAIL" if send_email_verification else None ]
670678 }
671679 response = self ._admin_request (url = self .users_uri , data = data , method = HTTPMethod .POST )
672- if response .status_code == 201 :
673- user = self .get_user (query = f'username={ username } ' )
674- if send_email_verification :
675- self .send_email_verification (user .id )
676- return user
677- else :
680+ if response .status_code != 201 :
678681 return response
682+ user = self .get_user (query = f'username={ username } ' )
683+ if send_email_verification :
684+ self .send_email_verification (user .id )
685+ return user
679686
680687 @result_or_error ()
681688 def change_password (self , user_id : str , new_password : str , temporary : bool = False ) -> dict :
@@ -747,9 +754,8 @@ def update_user(self, user: KeycloakUser):
747754 Raises:
748755 KeycloakError: If the resulting response is not a successful HTTP-Code (>299)
749756
750- Notes:
751- - You may alter any aspect of the user object, also the requiredActions for instance. There is not explicit function for updating those as it is a user update in
752- essence
757+ Notes: - You may alter any aspect of the user object, also the requiredActions for instance. There is no
758+ explicit function for updating those as it is a user update in essence
753759 """
754760 response = self ._admin_request (url = f'{ self .users_uri } /{ user .id } ' , data = user .__dict__ , method = HTTPMethod .PUT )
755761 if response .status_code == 204 : # Update successful
@@ -781,8 +787,7 @@ def get_all_users(self) -> List[KeycloakUser]:
781787 Raises:
782788 KeycloakError: If the resulting response is not a successful HTTP-Code (>299)
783789 """
784- response = self ._admin_request (url = self .users_uri , method = HTTPMethod .GET )
785- return response
790+ return self ._admin_request (url = self .users_uri , method = HTTPMethod .GET )
786791
787792 @result_or_error (response_model = KeycloakIdentityProvider , is_list = True )
788793 def get_identity_providers (self ) -> List [KeycloakIdentityProvider ]:
@@ -798,7 +803,8 @@ def get_identity_providers(self) -> List[KeycloakIdentityProvider]:
798803
799804 @result_or_error (response_model = KeycloakToken )
800805 def user_login (self , username : str , password : str ) -> KeycloakToken :
801- """ Models the password OAuth2 flow. Exchanges username and password for an access token. Will raise detailed errors if login fails due to requiredActions
806+ """ Models the password OAuth2 flow. Exchanges username and password for an access token. Will raise detailed
807+ errors if login fails due to requiredActions
802808
803809 Args:
804810 username (str): Username used for login
@@ -846,15 +852,17 @@ def user_login(self, username: str, password: str) -> KeycloakToken:
846852 }.get (
847853 reason , # Try to return the matching exception
848854 # On custom or unknown actions return a MandatoryActionException by default
849- MandatoryActionException (detail = f"This user can't login until the following action has been resolved: { reason } " )
855+ MandatoryActionException (detail = f"This user can't login until the following action has been "
856+ f"resolved: { reason } " )
850857 )
851858 raise exception
852859 return response
853860
854861 @result_or_error (response_model = KeycloakToken )
855862 def exchange_authorization_code (self , session_state : str , code : str ) -> KeycloakToken :
856- """ Models the authorization code OAuth2 flow. Opening the URL provided by `login_uri` will result in a callback to the configured callback URL.
857- The callback will also create a session_state and code query parameter that can be exchanged for an access token.
863+ """ Models the authorization code OAuth2 flow. Opening the URL provided by `login_uri` will result in a
864+ callback to the configured callback URL. The callback will also create a session_state and code query
865+ parameter that can be exchanged for an access token.
858866
859867 Args:
860868 session_state (str): Salt to reduce the risk of successful attacks
@@ -877,8 +885,7 @@ def exchange_authorization_code(self, session_state: str, code: str) -> Keycloak
877885 "grant_type" : "authorization_code" ,
878886 "redirect_uri" : self .callback_uri
879887 }
880- response = requests .post (url = self .token_uri , headers = headers , data = data )
881- return response
888+ return requests .post (url = self .token_uri , headers = headers , data = data )
882889
883890 def _admin_request (self , url : str , method : HTTPMethod , data : dict = None , content_type : str = "application/json" ) -> Response :
884891 """ Private method that is the basis for any requests requiring admin access to the api. Will append the necessary `Authorization` header
@@ -999,7 +1006,7 @@ def _decode_token(self, token: str, options: dict = None, audience: str = None)
9991006
10001007 def __str__ (self ):
10011008 """ String representation """
1002- return f 'FastAPI Keycloak Integration'
1009+ return 'FastAPI Keycloak Integration'
10031010
10041011 def __repr__ (self ):
10051012 """ Debug representation """
0 commit comments