|
21 | 21 |
|
22 | 22 |
|
23 | 23 | class IdentityModel(TypedDict):
|
24 |
| - username: str |
25 |
| - given_name: Optional[str] |
| 24 | + # see the JupyterLab IUser model for definitions |
| 25 | + |
| 26 | + username: str # the only truly required field |
| 27 | + |
| 28 | + # these fields are derived from username if not specified |
| 29 | + name: str |
| 30 | + display_name: str |
| 31 | + initials: str |
| 32 | + |
| 33 | + # these fields are left as None if undefined |
| 34 | + avatar_url: Optional[str] |
| 35 | + color: Optional[str] |
| 36 | + |
| 37 | + # Jupyter Server permissions |
| 38 | + # as a dict of permitted {"resource": ["actions"]} |
26 | 39 | permissions: Dict[str, List[str]]
|
27 | 40 |
|
28 | 41 |
|
| 42 | +def _fill_defaults(identity: IdentityModel) -> IdentityModel: |
| 43 | + """Fill out default fields in the identity model |
| 44 | +
|
| 45 | + - Ensures all values are defined |
| 46 | + - Fills out derivative values for name fields fields |
| 47 | + - Fills out null values for optional fields |
| 48 | + """ |
| 49 | + |
| 50 | + # username is the only truly required field |
| 51 | + if not identity.get("username"): |
| 52 | + raise ValueError(f"identity.username must not be empty: {identity}") |
| 53 | + |
| 54 | + # derive name fields from username -> name -> display name -> initials |
| 55 | + if not identity.get("name"): |
| 56 | + identity["name"] = identity["username"] |
| 57 | + if not identity.get("display_name"): |
| 58 | + identity["display_name"] = identity["name"] |
| 59 | + if not identity.get("initials"): |
| 60 | + identity["initials"] = identity["display_name"][0] |
| 61 | + |
| 62 | + # fields that should be defined, but use null if no information is provided |
| 63 | + for key in ("avatar_url", "color"): |
| 64 | + identity.setdefault(key, None) |
| 65 | + return identity |
| 66 | + |
| 67 | + |
29 | 68 | class IdentityHandler(APIHandler):
|
30 | 69 | """Get the current user's identity model"""
|
31 | 70 |
|
32 | 71 | @web.authenticated
|
33 | 72 | def get(self):
|
34 |
| - resources: List[str] = self.get_argument("resources") or [] |
35 |
| - actions: List[str] = self.get_argument("actions") or [ |
36 |
| - "read", |
37 |
| - "write", |
38 |
| - "execute", |
39 |
| - ] |
| 73 | + resources_json: str = self.get_argument("resources") |
| 74 | + bad_resources_msg = f'resources should be a JSON dict of {{"resource": ["action",]}}, got {resources_json!r}' |
| 75 | + if resources_json: |
| 76 | + try: |
| 77 | + resources = json.loads(resources_json) |
| 78 | + except ValueError: |
| 79 | + raise web.HTTPError(400, bad_resources_msg) |
| 80 | + if not isinstance(resources, dict): |
| 81 | + raise web.HTTPError(400, bad_resources_msg) |
| 82 | + |
40 | 83 | permissions: Dict[str, List[str]] = {}
|
41 | 84 | user = self.current_user
|
42 |
| - for resource in resources: |
| 85 | + |
| 86 | + for resource, actions in resources.items(): |
| 87 | + if ( |
| 88 | + not isinstance(resource, str) |
| 89 | + or not isinstance(actions, list) |
| 90 | + or not all(isinstance(action, str) for action in actions) |
| 91 | + ): |
| 92 | + raise web.HTTPError(400, bad_resources_msg) |
| 93 | + |
43 | 94 | allowed = permissions[resource] = []
|
44 | 95 | for action in actions:
|
45 | 96 | if self.authorizer.is_authorized(self, user=user, resource=resource, action=action):
|
46 | 97 | allowed.append(action)
|
| 98 | + |
47 | 99 | user_model: IdentityModel = dict(
|
48 | 100 | permissions=permissions,
|
49 | 101 | **self.authorizer.user_model(user),
|
50 | 102 | )
|
| 103 | + user_model = _fill_defaults(user_model) |
51 | 104 | self.write(json.dumps(user_model))
|
52 | 105 |
|
53 | 106 |
|
|
0 commit comments