Skip to content

Commit 88d9b7c

Browse files
marius-matheronesandzeroesuwwint
authored
feat: implement role checks across the whole admin API (AAI-453) (#99)
* refactor: rename user_is_admin -> user_is_general_admin to make purpose clearer * refactor: rename SessionUser.is_admin SessionUser.is_biocommons_admin * feat: add group/platform admin checks for the user * refactor: rename get_current_user -> get_session_user for clarity * refactor: add DB checks to user_is_general_admin check * test: update tests to allow for updated admin check * test: add tests of user_is_general_admin check * test: add tests of DB admin checks * refactor: update is-admin endpoint to is-general-admin * refactor: update tests of is-admin check * refactor: move user permissions to a new module * feat: add check for is_biocommons_admin and a new router for biocommons admin routes * chore: add biocommons_admin router to main app * refactor: start moving routes to /biocommons-admin * refactor: simplify creating groups in the API * refactor: lookup methods for roles and platforms * refactor: rework group/role/platform creation endpoints * test: move biocommons-admin tests to a dedicated module * test: fix mocks in user tests * feat: add is_admin check for platform model * refactor: rework UserQueryParams to add group/platform admin checks, use it in all endpoints * test: update tests of get_users to include platform-specific users * test: test group or platform admin access * feat: more permissions checks that can be applied to endpoints * refactor: move 'get_or_404' checks into models * refactor: add permission checks to admin endpoints * refactor: renamed get_for_admin_roles -> get_from_admin_roles * refactor: move shared param types to schemas to avoid circular import * fix: should use _or_404 for getting the group * fix: need to return platform * fix: fix checking admin roles for platform * fix: allow for / in group IDs in URLs * test: update tests of admin API to reflect permissions * refactor: exclude biocommons-admin endpoints from docs/API schema * test: include short_name when creating group * feat: add simple endpoint for checking if current user has admin rights to a specific platform * fix: sync platform roles to db, response typing test: increase test coverate to 100% * fix: undefiend role name in error message * docs: docstring fixes * docs: update DB diagram --------- Co-authored-by: Marius <marius.mather@gmail.com> Co-authored-by: Uwe Winter <uwwint@gmail.com>
1 parent a5ba688 commit 88d9b7c

23 files changed

+1557
-816
lines changed

auth/user_permissions.py

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
from typing import Annotated
2+
3+
from fastapi import Depends, HTTPException
4+
from sqlmodel import Session
5+
from starlette import status
6+
7+
from auth.validator import oauth2_scheme, verify_jwt
8+
from config import Settings, get_settings
9+
from db.models import BiocommonsGroup, BiocommonsUser, Platform
10+
from db.setup import get_db_session
11+
from schemas.biocommons import ServiceIdParam, UserIdParam
12+
from schemas.user import SessionUser
13+
14+
15+
def get_session_user(
16+
token: str = Depends(oauth2_scheme), settings: Settings = Depends(get_settings)
17+
) -> SessionUser:
18+
"""
19+
Get the current user's session data (access token).
20+
"""
21+
access_token = verify_jwt(token, settings=settings)
22+
return SessionUser(access_token=access_token)
23+
24+
25+
def get_db_user(
26+
current_user: Annotated[SessionUser, Depends(get_session_user)],
27+
db_session: Annotated[Session, Depends(get_db_session)], ) -> BiocommonsUser | None:
28+
"""
29+
Get the user's DB record.
30+
"""
31+
user = db_session.get(BiocommonsUser, current_user.access_token.sub)
32+
return user
33+
34+
35+
def user_is_general_admin(
36+
current_user: Annotated[SessionUser, Depends(get_session_user)],
37+
settings: Annotated[Settings, Depends(get_settings)],
38+
db_user: Annotated[BiocommonsUser, Depends(get_db_user)],
39+
db_session: Annotated[Session, Depends(get_db_session)],
40+
) -> SessionUser:
41+
"""
42+
Check if user has general admin privileges.
43+
This can come from:
44+
* A role listed in settings.admin_roles (for BioCommons admins)
45+
* A role listed in a group/platform's admin_roles in the DB (for platform sysadmins/project managers)
46+
"""
47+
if current_user.is_biocommons_admin(settings=settings):
48+
return current_user
49+
if db_user is not None:
50+
if db_user.is_any_platform_admin(access_token=current_user.access_token, db_session=db_session):
51+
return current_user
52+
if db_user.is_any_group_admin(access_token=current_user.access_token, db_session=db_session):
53+
return current_user
54+
raise HTTPException(
55+
status_code=status.HTTP_403_FORBIDDEN,
56+
detail="You must be an admin to access this endpoint.",
57+
)
58+
59+
60+
def user_is_biocommons_admin(
61+
current_user: Annotated[SessionUser, Depends(get_session_user)],
62+
settings: Annotated[Settings, Depends(get_settings)],
63+
) -> SessionUser:
64+
if current_user.is_biocommons_admin(settings=settings):
65+
return current_user
66+
raise HTTPException(
67+
status_code=status.HTTP_403_FORBIDDEN,
68+
detail="You must be an admin to access this endpoint.",
69+
)
70+
71+
72+
def has_platform_admin_permission(
73+
platform_id: str,
74+
current_user: Annotated[SessionUser, Depends(get_session_user)],
75+
db_session: Annotated[Session, Depends(get_db_session)],
76+
) -> bool:
77+
"""
78+
Check if user has platform admin privileges.
79+
"""
80+
platform = Platform.get_by_id(platform_id, db_session)
81+
if platform is None:
82+
raise HTTPException(
83+
status_code=status.HTTP_404_NOT_FOUND,
84+
detail=f"Platform '{platform_id}' not found",
85+
)
86+
return platform.user_is_admin(current_user)
87+
88+
89+
def has_admin_permission_for_user(
90+
user_id: str,
91+
admin: Annotated[SessionUser, Depends(user_is_general_admin)],
92+
db_session: Annotated[Session, Depends(get_db_session)],
93+
) -> bool:
94+
"""
95+
Check if the current admin has the right to manage the specified user,
96+
as either a platform admin or a group admin.
97+
"""
98+
platform_permission = has_platform_admin_permission_for_user(
99+
user_id=user_id,
100+
admin=admin,
101+
db_session=db_session,
102+
)
103+
group_permission = has_group_admin_permission_for_user(
104+
user_id=user_id,
105+
admin=admin,
106+
db_session=db_session,
107+
)
108+
return platform_permission or group_permission
109+
110+
111+
def has_platform_admin_permission_for_user(
112+
user_id: str,
113+
admin: Annotated[SessionUser, Depends(user_is_general_admin)],
114+
db_session: Annotated[Session, Depends(get_db_session)],
115+
) -> bool:
116+
"""
117+
Check if the current admin has the right to manage the specified user,
118+
based on platform admin roles.
119+
"""
120+
user_in_db = BiocommonsUser.get_by_id(user_id, session=db_session)
121+
if user_in_db is None:
122+
raise HTTPException(
123+
status_code=status.HTTP_404_NOT_FOUND,
124+
detail=f"User '{user_id}' not found",
125+
)
126+
admin_platforms = Platform.get_for_admin_roles(
127+
role_names=admin.access_token.biocommons_roles,
128+
session=db_session,
129+
)
130+
for membership in user_in_db.platform_memberships:
131+
if membership.platform in admin_platforms:
132+
return True
133+
return False
134+
135+
136+
def has_group_admin_permission_for_user(
137+
user_id: str,
138+
admin: Annotated[SessionUser, Depends(user_is_general_admin)],
139+
db_session: Annotated[Session, Depends(get_db_session)],
140+
) -> bool:
141+
"""
142+
Check if the current admin has the right to manage the specified user,
143+
based on group admin roles.
144+
"""
145+
user_in_db = BiocommonsUser.get_by_id(user_id, session=db_session)
146+
admin_groups = BiocommonsGroup.get_for_admin_roles(
147+
role_names=admin.access_token.biocommons_roles,
148+
session=db_session,
149+
)
150+
for membership in user_in_db.group_memberships:
151+
if membership.group in admin_groups:
152+
return True
153+
return False
154+
155+
156+
def require_admin_permission_for_user(
157+
user_id: Annotated[str, UserIdParam],
158+
admin: Annotated[SessionUser, Depends(user_is_general_admin)],
159+
db_session: Annotated[Session, Depends(get_db_session)],
160+
):
161+
"""
162+
Dependency for checking if the current admin has the right to manage the
163+
specified user (user_id should be a path parameter).
164+
Raises an HTTPException if not.
165+
"""
166+
if not has_admin_permission_for_user(user_id, admin, db_session):
167+
raise HTTPException(
168+
status_code=status.HTTP_403_FORBIDDEN,
169+
detail="You do not have permission to manage this user.",
170+
)
171+
return True
172+
173+
174+
def require_admin_permission_for_platform(
175+
platform_id: Annotated[str, ServiceIdParam],
176+
admin: Annotated[SessionUser, Depends(user_is_general_admin)],
177+
db_session: Annotated[Session, Depends(get_db_session)],
178+
):
179+
"""
180+
Dependency for checking if the current admin has the right to manage the
181+
platform (platform_id should be a path parameter).
182+
183+
Raises HTTPException if the admin doesn't have permission.
184+
"""
185+
platform = Platform.get_by_id(platform_id, db_session)
186+
if not platform.user_is_admin(admin):
187+
raise HTTPException(
188+
status_code=status.HTTP_403_FORBIDDEN,
189+
detail="You do not have permission to manage this platform.",
190+
)
191+
192+
193+
def require_admin_permission_for_group(
194+
group_id: Annotated[str, ServiceIdParam],
195+
admin: Annotated[SessionUser, Depends(user_is_general_admin)],
196+
db_session: Annotated[Session, Depends(get_db_session)],
197+
):
198+
"""
199+
Dependency for checking if the current admin has the right to manage the
200+
group (group_id should be a path parameter).
201+
"""
202+
group = BiocommonsGroup.get_by_id_or_404(group_id, db_session)
203+
if not group.user_is_admin(admin):
204+
raise HTTPException(
205+
status_code=status.HTTP_403_FORBIDDEN,
206+
detail="You do not have permission to manage this group.",
207+
)
208+
return True

auth/validator.py

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
1-
from typing import Annotated
2-
31
import httpx
42
from cachetools import TTLCache
5-
from fastapi import Depends, HTTPException, status
3+
from fastapi import HTTPException
64
from fastapi.security import OAuth2PasswordBearer
75
from jose import jwk, jwt
86
from jose.exceptions import JWTError
97

10-
from config import Settings, get_settings
8+
from config import Settings
119
from schemas.tokens import AccessTokenPayload
12-
from schemas.user import SessionUser
1310

1411
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
1512

@@ -77,22 +74,3 @@ def get_rsa_key(token: str, settings: Settings, retry_on_failure: bool = True) -
7774
return get_rsa_key(token, settings, retry_on_failure=False)
7875

7976
return None
80-
81-
82-
def get_current_user(
83-
token: str = Depends(oauth2_scheme), settings: Settings = Depends(get_settings)
84-
) -> SessionUser:
85-
access_token = verify_jwt(token, settings=settings)
86-
return SessionUser(access_token=access_token)
87-
88-
89-
def user_is_admin(
90-
current_user: Annotated[SessionUser, Depends(get_current_user)],
91-
settings: Annotated[Settings, Depends(get_settings)],
92-
) -> SessionUser:
93-
if not current_user.is_admin(settings=settings):
94-
raise HTTPException(
95-
status_code=status.HTTP_403_FORBIDDEN,
96-
detail="You must be an admin to access this endpoint.",
97-
)
98-
return current_user

biocommons/groups.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from pydantic import StringConstraints
55
from sqlmodel import Session
66

7-
from auth0.client import Auth0Client
87
from db.core import BaseModel
98
from db.models import Auth0Role, BiocommonsGroup
109

@@ -33,18 +32,19 @@ class BiocommonsGroupCreate(BaseModel):
3332
short_name: str
3433
admin_roles: list[RoleId | Auth0Role]
3534

36-
def save(self, session: Session, auth0_client: Auth0Client) -> BiocommonsGroup:
35+
def save_group(self, session: Session) -> BiocommonsGroup:
3736
db_roles = []
3837
for role in self.admin_roles:
3938
if isinstance(role, Auth0Role):
4039
db_roles.append(role)
4140
else:
42-
role = Auth0Role.get_or_create_by_name(
41+
db_role = Auth0Role.get_by_name(
4342
role,
4443
session,
45-
auth0_client=auth0_client
4644
)
47-
db_roles.append(role)
45+
if db_role is None:
46+
raise ValueError(f"Role {role} doesn't exist in DB - create roles first")
47+
db_roles.append(db_role)
4848
group = BiocommonsGroup(
4949
group_id=self.group_id,
5050
name=self.name,

0 commit comments

Comments
 (0)