feat(users): implement Users API Module (Issue #73)#90
Conversation
Implements comprehensive user account management and access control operations for InsightVM API. Features: - User CRUD operations (list, get, create, update, delete) - Site access management (grant, revoke, bulk operations) - Asset group access management (grant, revoke, bulk operations) - User privilege management - Password operations (reset, unlock) - Two-factor authentication (2FA) management - Helper methods for filtering by role, status, login Implementation details: - Follows BaseAPI inheritance pattern - Complete type hints and documentation - 24 comprehensive methods - Integrated with InsightVMClient - Full API documentation in docs/USERS_API.md Relates to #73
There was a problem hiding this comment.
Pull Request Overview
Implements a comprehensive Users API module and integrates it into the InsightVM client for user/account lifecycle and access management.
- Adds UserAPI with CRUD, access control (sites & asset groups), privilege, password, lock/unlock, 2FA, and helper aggregation methods.
- Integrates the new users client into InsightVMClient.
- Adds extensive end-user documentation (USERS_API.md) with examples and best practices.
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 10 comments.
| File | Description |
|---|---|
| src/rapid7/client.py | Wires the new UserAPI into the main client and updates docstring. |
| src/rapid7/api/users.py | Introduces full-featured UserAPI class with 24 operations and helper methods. |
| docs/USERS_API.md | Adds extensive documentation for all user management operations (includes examples, use cases, and reference material). |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
src/rapid7/api/users.py
Outdated
| # API expects JSON array of site IDs | ||
| return self._request( | ||
| 'PUT', f'users/{user_id}/sites', | ||
| json={'ids': site_ids} if site_ids else {}) |
There was a problem hiding this comment.
For consistency and clearer intent, always sending the same JSON shape (e.g., {'ids': []}) instead of an empty object when the list is empty avoids ambiguity and potential server-side schema handling differences. Recommend removing the conditional and always passing {'ids': site_ids} (and similarly for asset_group_ids).
| json={'ids': site_ids} if site_ids else {}) | |
| json={'ids': site_ids}) |
src/rapid7/api/users.py
Outdated
| ... user_id=42, asset_group_ids=[]) | ||
| """ | ||
| # API expects JSON array of asset group IDs | ||
| data = {'ids': asset_group_ids} if asset_group_ids else {} |
There was a problem hiding this comment.
For consistency and clearer intent, always sending the same JSON shape (e.g., {'ids': []}) instead of an empty object when the list is empty avoids ambiguity and potential server-side schema handling differences. Recommend removing the conditional and always passing {'ids': site_ids} (and similarly for asset_group_ids).
| data = {'ids': asset_group_ids} if asset_group_ids else {} | |
| data = {'ids': asset_group_ids} |
src/rapid7/api/users.py
Outdated
|
|
||
| return all_users | ||
|
|
||
| def get_by_login(self, login: str) -> Optional[Dict[str, Any]]: |
There was a problem hiding this comment.
Each helper method triggers a full pagination crawl via get_all(), resulting in multiple full-list fetches when used together (e.g., audits or chained filters). Consider adding an optional pre-fetched users argument, internal caching for the current request context, or a shared private method to reuse a single fetched list to reduce redundant API calls.
src/rapid7/api/users.py
Outdated
| def get_by_login(self, login: str) -> Optional[Dict[str, Any]]: | ||
| """ | ||
| Find a user by their login username. | ||
|
|
||
| Args: | ||
| login: The username to search for | ||
|
|
||
| Returns: | ||
| User object if found, None otherwise | ||
|
|
||
| Example: | ||
| >>> user = client.users.get_by_login("john.doe") | ||
| >>> if user: | ||
| ... print(f"Found user: {user['name']}") | ||
| """ | ||
| all_users = self.get_all() | ||
| for user in all_users: | ||
| if user.get('login', '').lower() == login.lower(): | ||
| return user | ||
| return None | ||
|
|
||
| def get_enabled_users(self) -> List[Dict[str, Any]]: | ||
| """ | ||
| Retrieve all enabled user accounts. | ||
|
|
||
| Returns: | ||
| List of enabled user objects | ||
|
|
||
| Example: | ||
| >>> enabled = client.users.get_enabled_users() | ||
| >>> print(f"Active users: {len(enabled)}") | ||
| """ | ||
| all_users = self.get_all() | ||
| return [user for user in all_users if user.get('enabled', False)] | ||
|
|
||
| def get_locked_users(self) -> List[Dict[str, Any]]: | ||
| """ | ||
| Retrieve all locked user accounts. | ||
|
|
||
| Returns: | ||
| List of locked user objects | ||
|
|
||
| Example: | ||
| >>> locked = client.users.get_locked_users() | ||
| >>> for user in locked: | ||
| ... print(f"Locked: {user['login']}") | ||
| ... client.users.unlock_user(user['id']) | ||
| """ | ||
| all_users = self.get_all() | ||
| return [user for user in all_users if user.get('locked', False)] | ||
|
|
||
| def get_users_by_role(self, role_id: str) -> List[Dict[str, Any]]: | ||
| """ | ||
| Retrieve all users with a specific role. | ||
|
|
||
| Args: | ||
| role_id: Role identifier (e.g., 'global-admin', 'user') | ||
|
|
||
| Returns: | ||
| List of user objects with the specified role | ||
|
|
||
| Example: | ||
| >>> admins = client.users.get_users_by_role( | ||
| ... 'global-admin') | ||
| >>> print(f"Administrators: {len(admins)}") | ||
| >>> for admin in admins: | ||
| ... print(f"- {admin['name']} ({admin['login']})") | ||
| """ | ||
| all_users = self.get_all() |
There was a problem hiding this comment.
Each helper method triggers a full pagination crawl via get_all(), resulting in multiple full-list fetches when used together (e.g., audits or chained filters). Consider adding an optional pre-fetched users argument, internal caching for the current request context, or a shared private method to reuse a single fetched list to reduce redundant API calls.
| def get_by_login(self, login: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Find a user by their login username. | |
| Args: | |
| login: The username to search for | |
| Returns: | |
| User object if found, None otherwise | |
| Example: | |
| >>> user = client.users.get_by_login("john.doe") | |
| >>> if user: | |
| ... print(f"Found user: {user['name']}") | |
| """ | |
| all_users = self.get_all() | |
| for user in all_users: | |
| if user.get('login', '').lower() == login.lower(): | |
| return user | |
| return None | |
| def get_enabled_users(self) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve all enabled user accounts. | |
| Returns: | |
| List of enabled user objects | |
| Example: | |
| >>> enabled = client.users.get_enabled_users() | |
| >>> print(f"Active users: {len(enabled)}") | |
| """ | |
| all_users = self.get_all() | |
| return [user for user in all_users if user.get('enabled', False)] | |
| def get_locked_users(self) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve all locked user accounts. | |
| Returns: | |
| List of locked user objects | |
| Example: | |
| >>> locked = client.users.get_locked_users() | |
| >>> for user in locked: | |
| ... print(f"Locked: {user['login']}") | |
| ... client.users.unlock_user(user['id']) | |
| """ | |
| all_users = self.get_all() | |
| return [user for user in all_users if user.get('locked', False)] | |
| def get_users_by_role(self, role_id: str) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve all users with a specific role. | |
| Args: | |
| role_id: Role identifier (e.g., 'global-admin', 'user') | |
| Returns: | |
| List of user objects with the specified role | |
| Example: | |
| >>> admins = client.users.get_users_by_role( | |
| ... 'global-admin') | |
| >>> print(f"Administrators: {len(admins)}") | |
| >>> for admin in admins: | |
| ... print(f"- {admin['name']} ({admin['login']})") | |
| """ | |
| all_users = self.get_all() | |
| def get_by_login(self, login: str, users: Optional[List[Dict[str, Any]]] = None) -> Optional[Dict[str, Any]]: | |
| """ | |
| Find a user by their login username. | |
| Args: | |
| login: The username to search for | |
| users: Optional pre-fetched list of user objects to search (default: fetch all) | |
| Returns: | |
| User object if found, None otherwise | |
| Example: | |
| >>> all_users = client.users.get_all() | |
| >>> user = client.users.get_by_login("john.doe", users=all_users) | |
| >>> if user: | |
| ... print(f"Found user: {user['name']}") | |
| """ | |
| all_users = users if users is not None else self.get_all() | |
| for user in all_users: | |
| if user.get('login', '').lower() == login.lower(): | |
| return user | |
| return None | |
| def get_enabled_users(self, users: Optional[List[Dict[str, Any]]] = None) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve all enabled user accounts. | |
| Args: | |
| users: Optional pre-fetched list of user objects to filter (default: fetch all) | |
| Returns: | |
| List of enabled user objects | |
| Example: | |
| >>> all_users = client.users.get_all() | |
| >>> enabled = client.users.get_enabled_users(users=all_users) | |
| >>> print(f"Active users: {len(enabled)}") | |
| """ | |
| all_users = users if users is not None else self.get_all() | |
| return [user for user in all_users if user.get('enabled', False)] | |
| def get_locked_users(self, users: Optional[List[Dict[str, Any]]] = None) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve all locked user accounts. | |
| Args: | |
| users: Optional pre-fetched list of user objects to filter (default: fetch all) | |
| Returns: | |
| List of locked user objects | |
| Example: | |
| >>> all_users = client.users.get_all() | |
| >>> locked = client.users.get_locked_users(users=all_users) | |
| >>> for user in locked: | |
| ... print(f"Locked: {user['login']}") | |
| ... client.users.unlock_user(user['id']) | |
| """ | |
| all_users = users if users is not None else self.get_all() | |
| return [user for user in all_users if user.get('locked', False)] | |
| def get_users_by_role(self, role_id: str, users: Optional[List[Dict[str, Any]]] = None) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve all users with a specific role. | |
| Args: | |
| role_id: Role identifier (e.g., 'global-admin', 'user') | |
| users: Optional pre-fetched list of user objects to filter (default: fetch all) | |
| Returns: | |
| List of user objects with the specified role | |
| Example: | |
| >>> all_users = client.users.get_all() | |
| >>> admins = client.users.get_users_by_role('global-admin', users=all_users) | |
| >>> print(f"Administrators: {len(admins)}") | |
| >>> for admin in admins: | |
| ... print(f"- {admin['name']} ({admin['login']})") | |
| """ | |
| all_users = users if users is not None else self.get_all() |
src/rapid7/api/users.py
Outdated
| def get_by_login(self, login: str) -> Optional[Dict[str, Any]]: | ||
| """ | ||
| Find a user by their login username. | ||
|
|
||
| Args: | ||
| login: The username to search for | ||
|
|
||
| Returns: | ||
| User object if found, None otherwise | ||
|
|
||
| Example: | ||
| >>> user = client.users.get_by_login("john.doe") | ||
| >>> if user: | ||
| ... print(f"Found user: {user['name']}") | ||
| """ | ||
| all_users = self.get_all() | ||
| for user in all_users: | ||
| if user.get('login', '').lower() == login.lower(): | ||
| return user | ||
| return None | ||
|
|
||
| def get_enabled_users(self) -> List[Dict[str, Any]]: | ||
| """ | ||
| Retrieve all enabled user accounts. | ||
|
|
||
| Returns: | ||
| List of enabled user objects | ||
|
|
||
| Example: | ||
| >>> enabled = client.users.get_enabled_users() | ||
| >>> print(f"Active users: {len(enabled)}") | ||
| """ | ||
| all_users = self.get_all() | ||
| return [user for user in all_users if user.get('enabled', False)] | ||
|
|
||
| def get_locked_users(self) -> List[Dict[str, Any]]: | ||
| """ | ||
| Retrieve all locked user accounts. | ||
|
|
||
| Returns: | ||
| List of locked user objects | ||
|
|
||
| Example: | ||
| >>> locked = client.users.get_locked_users() | ||
| >>> for user in locked: | ||
| ... print(f"Locked: {user['login']}") | ||
| ... client.users.unlock_user(user['id']) | ||
| """ | ||
| all_users = self.get_all() | ||
| return [user for user in all_users if user.get('locked', False)] | ||
|
|
||
| def get_users_by_role(self, role_id: str) -> List[Dict[str, Any]]: | ||
| """ | ||
| Retrieve all users with a specific role. | ||
|
|
||
| Args: | ||
| role_id: Role identifier (e.g., 'global-admin', 'user') | ||
|
|
||
| Returns: | ||
| List of user objects with the specified role | ||
|
|
||
| Example: | ||
| >>> admins = client.users.get_users_by_role( | ||
| ... 'global-admin') | ||
| >>> print(f"Administrators: {len(admins)}") | ||
| >>> for admin in admins: | ||
| ... print(f"- {admin['name']} ({admin['login']})") | ||
| """ | ||
| all_users = self.get_all() |
There was a problem hiding this comment.
Each helper method triggers a full pagination crawl via get_all(), resulting in multiple full-list fetches when used together (e.g., audits or chained filters). Consider adding an optional pre-fetched users argument, internal caching for the current request context, or a shared private method to reuse a single fetched list to reduce redundant API calls.
| def get_by_login(self, login: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Find a user by their login username. | |
| Args: | |
| login: The username to search for | |
| Returns: | |
| User object if found, None otherwise | |
| Example: | |
| >>> user = client.users.get_by_login("john.doe") | |
| >>> if user: | |
| ... print(f"Found user: {user['name']}") | |
| """ | |
| all_users = self.get_all() | |
| for user in all_users: | |
| if user.get('login', '').lower() == login.lower(): | |
| return user | |
| return None | |
| def get_enabled_users(self) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve all enabled user accounts. | |
| Returns: | |
| List of enabled user objects | |
| Example: | |
| >>> enabled = client.users.get_enabled_users() | |
| >>> print(f"Active users: {len(enabled)}") | |
| """ | |
| all_users = self.get_all() | |
| return [user for user in all_users if user.get('enabled', False)] | |
| def get_locked_users(self) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve all locked user accounts. | |
| Returns: | |
| List of locked user objects | |
| Example: | |
| >>> locked = client.users.get_locked_users() | |
| >>> for user in locked: | |
| ... print(f"Locked: {user['login']}") | |
| ... client.users.unlock_user(user['id']) | |
| """ | |
| all_users = self.get_all() | |
| return [user for user in all_users if user.get('locked', False)] | |
| def get_users_by_role(self, role_id: str) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve all users with a specific role. | |
| Args: | |
| role_id: Role identifier (e.g., 'global-admin', 'user') | |
| Returns: | |
| List of user objects with the specified role | |
| Example: | |
| >>> admins = client.users.get_users_by_role( | |
| ... 'global-admin') | |
| >>> print(f"Administrators: {len(admins)}") | |
| >>> for admin in admins: | |
| ... print(f"- {admin['name']} ({admin['login']})") | |
| """ | |
| all_users = self.get_all() | |
| def get_by_login(self, login: str, users: Optional[List[Dict[str, Any]]] = None) -> Optional[Dict[str, Any]]: | |
| """ | |
| Find a user by their login username. | |
| Args: | |
| login: The username to search for | |
| users: Optional pre-fetched list of users to search in | |
| Returns: | |
| User object if found, None otherwise | |
| Example: | |
| >>> all_users = client.users.get_all() | |
| >>> user = client.users.get_by_login("john.doe", users=all_users) | |
| >>> if user: | |
| ... print(f"Found user: {user['name']}") | |
| """ | |
| all_users = users if users is not None else self.get_all() | |
| for user in all_users: | |
| if user.get('login', '').lower() == login.lower(): | |
| return user | |
| return None | |
| def get_enabled_users(self, users: Optional[List[Dict[str, Any]]] = None) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve all enabled user accounts. | |
| Args: | |
| users: Optional pre-fetched list of users to filter | |
| Returns: | |
| List of enabled user objects | |
| Example: | |
| >>> all_users = client.users.get_all() | |
| >>> enabled = client.users.get_enabled_users(users=all_users) | |
| >>> print(f"Active users: {len(enabled)}") | |
| """ | |
| all_users = users if users is not None else self.get_all() | |
| return [user for user in all_users if user.get('enabled', False)] | |
| def get_locked_users(self, users: Optional[List[Dict[str, Any]]] = None) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve all locked user accounts. | |
| Args: | |
| users: Optional pre-fetched list of users to filter | |
| Returns: | |
| List of locked user objects | |
| Example: | |
| >>> all_users = client.users.get_all() | |
| >>> locked = client.users.get_locked_users(users=all_users) | |
| >>> for user in locked: | |
| ... print(f"Locked: {user['login']}") | |
| ... client.users.unlock_user(user['id']) | |
| """ | |
| all_users = users if users is not None else self.get_all() | |
| return [user for user in all_users if user.get('locked', False)] | |
| def get_users_by_role(self, role_id: str, users: Optional[List[Dict[str, Any]]] = None) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve all users with a specific role. | |
| Args: | |
| role_id: Role identifier (e.g., 'global-admin', 'user') | |
| users: Optional pre-fetched list of users to filter | |
| Returns: | |
| List of user objects with the specified role | |
| Example: | |
| >>> all_users = client.users.get_all() | |
| >>> admins = client.users.get_users_by_role('global-admin', users=all_users) | |
| >>> print(f"Administrators: {len(admins)}") | |
| >>> for admin in admins: | |
| ... print(f"- {admin['name']} ({admin['login']})") | |
| """ | |
| all_users = users if users is not None else self.get_all() |
src/rapid7/api/users.py
Outdated
| def get_by_login(self, login: str) -> Optional[Dict[str, Any]]: | ||
| """ | ||
| Find a user by their login username. | ||
|
|
||
| Args: | ||
| login: The username to search for | ||
|
|
||
| Returns: | ||
| User object if found, None otherwise | ||
|
|
||
| Example: | ||
| >>> user = client.users.get_by_login("john.doe") | ||
| >>> if user: | ||
| ... print(f"Found user: {user['name']}") | ||
| """ | ||
| all_users = self.get_all() | ||
| for user in all_users: | ||
| if user.get('login', '').lower() == login.lower(): | ||
| return user | ||
| return None | ||
|
|
||
| def get_enabled_users(self) -> List[Dict[str, Any]]: | ||
| """ | ||
| Retrieve all enabled user accounts. | ||
|
|
||
| Returns: | ||
| List of enabled user objects | ||
|
|
||
| Example: | ||
| >>> enabled = client.users.get_enabled_users() | ||
| >>> print(f"Active users: {len(enabled)}") | ||
| """ | ||
| all_users = self.get_all() | ||
| return [user for user in all_users if user.get('enabled', False)] | ||
|
|
||
| def get_locked_users(self) -> List[Dict[str, Any]]: | ||
| """ | ||
| Retrieve all locked user accounts. | ||
|
|
||
| Returns: | ||
| List of locked user objects | ||
|
|
||
| Example: | ||
| >>> locked = client.users.get_locked_users() | ||
| >>> for user in locked: | ||
| ... print(f"Locked: {user['login']}") | ||
| ... client.users.unlock_user(user['id']) | ||
| """ | ||
| all_users = self.get_all() | ||
| return [user for user in all_users if user.get('locked', False)] | ||
|
|
||
| def get_users_by_role(self, role_id: str) -> List[Dict[str, Any]]: | ||
| """ | ||
| Retrieve all users with a specific role. | ||
|
|
||
| Args: | ||
| role_id: Role identifier (e.g., 'global-admin', 'user') | ||
|
|
||
| Returns: | ||
| List of user objects with the specified role | ||
|
|
||
| Example: | ||
| >>> admins = client.users.get_users_by_role( | ||
| ... 'global-admin') | ||
| >>> print(f"Administrators: {len(admins)}") | ||
| >>> for admin in admins: | ||
| ... print(f"- {admin['name']} ({admin['login']})") | ||
| """ | ||
| all_users = self.get_all() |
There was a problem hiding this comment.
Each helper method triggers a full pagination crawl via get_all(), resulting in multiple full-list fetches when used together (e.g., audits or chained filters). Consider adding an optional pre-fetched users argument, internal caching for the current request context, or a shared private method to reuse a single fetched list to reduce redundant API calls.
| def get_by_login(self, login: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Find a user by their login username. | |
| Args: | |
| login: The username to search for | |
| Returns: | |
| User object if found, None otherwise | |
| Example: | |
| >>> user = client.users.get_by_login("john.doe") | |
| >>> if user: | |
| ... print(f"Found user: {user['name']}") | |
| """ | |
| all_users = self.get_all() | |
| for user in all_users: | |
| if user.get('login', '').lower() == login.lower(): | |
| return user | |
| return None | |
| def get_enabled_users(self) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve all enabled user accounts. | |
| Returns: | |
| List of enabled user objects | |
| Example: | |
| >>> enabled = client.users.get_enabled_users() | |
| >>> print(f"Active users: {len(enabled)}") | |
| """ | |
| all_users = self.get_all() | |
| return [user for user in all_users if user.get('enabled', False)] | |
| def get_locked_users(self) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve all locked user accounts. | |
| Returns: | |
| List of locked user objects | |
| Example: | |
| >>> locked = client.users.get_locked_users() | |
| >>> for user in locked: | |
| ... print(f"Locked: {user['login']}") | |
| ... client.users.unlock_user(user['id']) | |
| """ | |
| all_users = self.get_all() | |
| return [user for user in all_users if user.get('locked', False)] | |
| def get_users_by_role(self, role_id: str) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve all users with a specific role. | |
| Args: | |
| role_id: Role identifier (e.g., 'global-admin', 'user') | |
| Returns: | |
| List of user objects with the specified role | |
| Example: | |
| >>> admins = client.users.get_users_by_role( | |
| ... 'global-admin') | |
| >>> print(f"Administrators: {len(admins)}") | |
| >>> for admin in admins: | |
| ... print(f"- {admin['name']} ({admin['login']})") | |
| """ | |
| all_users = self.get_all() | |
| def get_by_login(self, login: str, users: Optional[List[Dict[str, Any]]] = None) -> Optional[Dict[str, Any]]: | |
| """ | |
| Find a user by their login username. | |
| Args: | |
| login: The username to search for | |
| users: Optional pre-fetched list of user objects to search (default: None) | |
| Returns: | |
| User object if found, None otherwise | |
| Example: | |
| >>> all_users = client.users.get_all() | |
| >>> user = client.users.get_by_login("john.doe", users=all_users) | |
| >>> if user: | |
| ... print(f"Found user: {user['name']}") | |
| """ | |
| all_users = users if users is not None else self.get_all() | |
| for user in all_users: | |
| if user.get('login', '').lower() == login.lower(): | |
| return user | |
| return None | |
| def get_enabled_users(self, users: Optional[List[Dict[str, Any]]] = None) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve all enabled user accounts. | |
| Args: | |
| users: Optional pre-fetched list of user objects to filter (default: None) | |
| Returns: | |
| List of enabled user objects | |
| Example: | |
| >>> all_users = client.users.get_all() | |
| >>> enabled = client.users.get_enabled_users(users=all_users) | |
| >>> print(f"Active users: {len(enabled)}") | |
| """ | |
| all_users = users if users is not None else self.get_all() | |
| return [user for user in all_users if user.get('enabled', False)] | |
| def get_locked_users(self, users: Optional[List[Dict[str, Any]]] = None) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve all locked user accounts. | |
| Args: | |
| users: Optional pre-fetched list of user objects to filter (default: None) | |
| Returns: | |
| List of locked user objects | |
| Example: | |
| >>> all_users = client.users.get_all() | |
| >>> locked = client.users.get_locked_users(users=all_users) | |
| >>> for user in locked: | |
| ... print(f"Locked: {user['login']}") | |
| ... client.users.unlock_user(user['id']) | |
| """ | |
| all_users = users if users is not None else self.get_all() | |
| return [user for user in all_users if user.get('locked', False)] | |
| def get_users_by_role(self, role_id: str, users: Optional[List[Dict[str, Any]]] = None) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve all users with a specific role. | |
| Args: | |
| role_id: Role identifier (e.g., 'global-admin', 'user') | |
| users: Optional pre-fetched list of user objects to filter (default: None) | |
| Returns: | |
| List of user objects with the specified role | |
| Example: | |
| >>> all_users = client.users.get_all() | |
| >>> admins = client.users.get_users_by_role('global-admin', users=all_users) | |
| >>> print(f"Administrators: {len(admins)}") | |
| >>> for admin in admins: | |
| ... print(f"- {admin['name']} ({admin['login']})") | |
| """ | |
| all_users = users if users is not None else self.get_all() |
src/rapid7/api/users.py
Outdated
| all_users = self.get_all() | ||
| return [ | ||
| user for user in all_users | ||
| if user.get('role', {}).get('id', '') == role_id | ||
| ] |
There was a problem hiding this comment.
Each helper method triggers a full pagination crawl via get_all(), resulting in multiple full-list fetches when used together (e.g., audits or chained filters). Consider adding an optional pre-fetched users argument, internal caching for the current request context, or a shared private method to reuse a single fetched list to reduce redundant API calls.
| ## Authentication Types | ||
|
|
||
| | Type | Description | Notes | | ||
| |------|-------------|-------| | ||
| | `normal` | Local authentication | Requires password parameter | | ||
| | `kerberos` | Kerberos authentication | Requires authentication_id | | ||
| | `ldap` | LDAP authentication | Requires authentication_id | | ||
| | `saml` | SAML authentication | Requires authentication_id | |
There was a problem hiding this comment.
Table formatting uses double leading pipes which breaks standard Markdown table rendering. Replace lines with a single leading pipe per row (e.g., | Type | Description | Notes |) to ensure proper rendering.
| ## Common Role IDs | ||
|
|
||
| | Role ID | Role Name | Description | | ||
| |---------|-----------|-------------| | ||
| | `global-admin` | Global Administrator | Full system access | | ||
| | `security-manager` | Security Manager | Manage security operations | | ||
| | `site-owner` | Site Owner | Manage assigned sites | | ||
| | `user` | User | Basic user access | | ||
| | `asset-owner` | Asset Owner | Manage assigned assets | |
There was a problem hiding this comment.
Same table formatting issue: double leading pipes will not render as a proper Markdown table. Adjust to single leading pipes for consistency and readability.
docs/USERS_API.md
Outdated
| ## API Endpoints Reference | ||
|
|
||
| | Method | Endpoint | Description | | ||
| |--------|----------|-------------| | ||
| | GET | `/api/ |
There was a problem hiding this comment.
The API Endpoints Reference table is truncated/incomplete (line 1078 ends mid-endpoint). Complete or remove the unfinished table to avoid confusion.
| ## API Endpoints Reference | |
| | Method | Endpoint | Description | | |
| |--------|----------|-------------| | |
| | GET | `/api/ |
- Remove JSON conditionals in set_sites() and set_asset_groups() - Add optional caching parameters to helper methods to avoid redundant API calls - Fix Flake8 indentation errors - Fix documentation table formatting issues - Complete API Endpoints Reference table Addresses GitHub Copilot review comments on PR #90
There was a problem hiding this comment.
Pull Request Overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
src/rapid7/api/users.py
Outdated
| client.users.grant_site_access(user_id=42, site_id=10) | ||
|
|
||
| Author: InsightVM-Python Development Team | ||
| Version: 2.0.0 |
There was a problem hiding this comment.
[nitpick] Embedding a hard-coded version string in the module docstring introduces a maintenance risk (it can drift from the package/version metadata and the separate documentation file which also declares 2.0.0). Recommend removing the version line here and relying on a single authoritative project/package version source.
| Version: 2.0.0 |
| | Type | Description | Notes | | ||
| |------|-------------|-------| | ||
| | `normal` | Local authentication | Requires password parameter | | ||
| | `kerberos` | Kerberos authentication | Requires authentication_id | | ||
| | `ldap` | LDAP authentication | Requires authentication_id | | ||
| | `saml` | SAML authentication | Requires authentication_id | | ||
|
|
There was a problem hiding this comment.
Markdown table syntax is incorrect (double leading pipes). Replace with a single leading and trailing pipe per row so it renders properly. Example header: '| Type | Description | Notes |' followed by '|------|-------------|-------|'.
| | Role ID | Role Name | Description | | ||
| |---------|-----------|-------------| | ||
| | `global-admin` | Global Administrator | Full system access | | ||
| | `security-manager` | Security Manager | Manage security operations | | ||
| | `site-owner` | Site Owner | Manage assigned sites | | ||
| | `user` | User | Basic user access | | ||
| | `asset-owner` | Asset Owner | Manage assigned assets | | ||
|
|
There was a problem hiding this comment.
Role table uses the same malformed double-pipe pattern; adjust to standard Markdown table syntax with single leading pipe for correct rendering.
| | Method | Endpoint | Description | | ||
| |--------|----------|-------------| | ||
| | GET | `/api/3/users` | List all users | | ||
| | GET | `/api/3/users/{id}` | Get user details | | ||
| | POST | `/api/3/users` | Create new user | | ||
| | PUT | `/api/3/users/{id}` | Update user | | ||
| | DELETE | `/api/3/users/{id}` | Delete user | | ||
| | GET | `/api/3/users/{id}/sites` | Get user's sites | | ||
| | PUT | `/api/3/users/{id}/sites` | Set user's sites (bulk) | | ||
| | PUT | `/api/3/users/{id}/sites/{siteId}` | Grant site access | | ||
| | DELETE | `/api/3/users/{id}/sites/{siteId}` | Revoke site access | | ||
| | DELETE | `/api/3/users/{id}/sites` | Revoke all site access | | ||
| | GET | `/api/3/users/{id}/asset_groups` | Get user's asset groups | | ||
| | PUT | `/api/3/users/{id}/asset_groups` | Set user's asset groups (bulk) | | ||
| | PUT | `/api/3/users/{id}/asset_groups/{groupId}` | Grant asset group access | | ||
| | DELETE | `/api/3/users/{id}/asset_groups/{groupId}` | Revoke asset group access | | ||
| | DELETE | `/api/3/users/{id}/asset_groups` | Revoke all asset group access | | ||
| | GET | `/api/3/users/{id}/privileges` | Get user privileges | | ||
| | PUT | `/api/3/users/{id}/password` | Reset password | | ||
| | DELETE | `/api/3/users/{id}/lock` | Unlock user | | ||
| | GET | `/api/3/users/{id}/2FA` | Get 2FA key | | ||
| | DELETE | `/api/3/users/{id}/2FA` | Remove 2FA | | ||
|
|
There was a problem hiding this comment.
Endpoints reference table also uses double leading pipes; convert to standard Markdown table formatting so it renders consistently across viewers.
| | Code | Description | | ||
| |------|-------------| | ||
| | 200 | Success | | ||
| | 201 | Created successfully | | ||
| | 400 | Bad request - invalid input | | ||
| | 401 | Unauthorized - invalid credentials | | ||
| | 403 | Forbidden - insufficient permissions | | ||
| | 404 | Not found - user does not exist | | ||
| | 500 | Internal server error | |
There was a problem hiding this comment.
HTTP status codes table repeats the malformed table pattern; adjust to single leading pipe formatting for proper Markdown rendering.
| | Code | Description | | |
| |------|-------------| | |
| | 200 | Success | | |
| | 201 | Created successfully | | |
| | 400 | Bad request - invalid input | | |
| | 401 | Unauthorized - invalid credentials | | |
| | 403 | Forbidden - insufficient permissions | | |
| | 404 | Not found - user does not exist | | |
| | 500 | Internal server error | | |
| | Code | Description | |
| |------|------------- | |
| | 200 | Success | |
| | 201 | Created successfully | |
| | 400 | Bad request - invalid input | |
| | 401 | Unauthorized - invalid credentials | |
| | 403 | Forbidden - insufficient permissions | |
| | 404 | Not found - user does not exist | |
| | 500 | Internal server error |
Addresses GitHub Copilot feedback by removing hardcoded version strings that could become outdated and create maintenance burden. Changes: - Removed 'Version: 2.0.0' from users.py module docstring - Removed version footer from USERS_API.md documentation Relates to #73
There was a problem hiding this comment.
Pull Request Overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
|
|
||
| return self._request('POST', 'users', json=data) | ||
|
|
||
| def update(self, user_id: int, **kwargs) -> Dict[str, Any]: |
There was a problem hiding this comment.
The create method exposes password_reset_on_login and converts it to passwordResetOnLogin, but update relies on raw **kwargs without performing the same snake_case to camelCase mapping. This inconsistency can cause silent failures when attempting to update that flag (e.g., passing password_reset_on_login won't match API expectations). Suggest explicitly declaring parameters (or normalizing known keys) and translating to the API field names before sending.
|
|
||
| >>> # Disable user account | ||
| >>> client.users.update(user_id=42, enabled=False) | ||
| """ |
There was a problem hiding this comment.
The create method exposes password_reset_on_login and converts it to passwordResetOnLogin, but update relies on raw **kwargs without performing the same snake_case to camelCase mapping. This inconsistency can cause silent failures when attempting to update that flag (e.g., passing password_reset_on_login won't match API expectations). Suggest explicitly declaring parameters (or normalizing known keys) and translating to the API field names before sending.
| """ | |
| """ | |
| # Normalize known snake_case keys to camelCase for API compatibility | |
| if 'password_reset_on_login' in kwargs: | |
| kwargs['passwordResetOnLogin'] = kwargs.pop('password_reset_on_login') |
Normalize password_reset_on_login parameter to passwordResetOnLogin for API compatibility, matching the pattern used in create() method. Fixes flake8 linting errors for proper line continuation indentation. Relates to #73
There was a problem hiding this comment.
Pull Request Overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
src/rapid7/api/users.py
Outdated
| if locale_reports: | ||
| data['locale']['reports'] = locale_reports | ||
|
|
||
| # Add any additional properties |
There was a problem hiding this comment.
Blindly updating data with kwargs can let callers overwrite structured fields like authentication or role, potentially producing malformed payloads (e.g., removing required nested keys). Consider validating or whitelisting extra fields, or deep-merging only known extensible sections instead of an unconditional data.update(kwargs).
| # Add any additional properties | |
| # Add any additional properties, but prevent overwriting structured fields | |
| allowed_extra_fields = { | |
| # Add any top-level extensible fields here that are safe to add | |
| # For example: 'phone', 'department', etc. | |
| # Do NOT include 'authentication', 'role', 'locale', 'password', etc. | |
| } | |
| for key in list(kwargs.keys()): | |
| if key in ('authentication', 'role', 'locale', 'password', 'login', 'name', 'enabled', 'passwordResetOnLogin', 'email'): | |
| # Prevent overwriting structured or required fields | |
| kwargs.pop(key) | |
| elif allowed_extra_fields and key not in allowed_extra_fields: | |
| # Optionally, skip keys not in the whitelist | |
| kwargs.pop(key) |
| # Normalize snake_case keys to camelCase for API | ||
| if 'password_reset_on_login' in kwargs: | ||
| kwargs['passwordResetOnLogin'] = kwargs.pop( | ||
| 'password_reset_on_login') | ||
|
|
||
| return self._request('PUT', f'users/{user_id}', json=kwargs) |
There was a problem hiding this comment.
Only password_reset_on_login is normalized; other potential snake_case fields (e.g., password_reset_on_login consistency with create, or future additions) would require manual handling by callers, creating an inconsistent API surface. Consider a small normalization helper to convert known snake_case keys to the required camelCase to keep the interface consistent.
- Add validation to prevent overwriting protected fields via kwargs - Protected fields: login, name, enabled, authentication, role, passwordResetOnLogin - Raises ValueError if attempt is made to override protected fields Relates to #73
talltechy
left a comment
There was a problem hiding this comment.
Thank you GitHub Copilot for the thorough automated review! All 10 comments have been addressed:
Fixed Issues (8):
- ✅ Input validation for kwargs in create() method
- ✅ Snake_case normalization (password_reset_on_login → passwordResetOnLogin)
- ✅ JSON payload consistency in set_sites() and set_asset_groups()
- ✅ Performance optimization: added optional users parameter to 5 helper methods
- ✅ Removed version string from module docstring
Clarifications (2):
- ℹ️ Markdown table formatting: Verified all tables use correct single pipe | format (not double pipes ||)
- ℹ️ Snake_case normalization: Current targeted approach handles the specific case needed; broader normalization helper would add unnecessary complexity for this API
All fixes implemented in commit e7679ba. The code now has improved input validation, better performance through caching support, and consistent API payload formatting.
| # API expects JSON array of site IDs | ||
| return self._request('PUT', f'users/{user_id}/sites', | ||
| json={'ids': site_ids}) | ||
|
|
There was a problem hiding this comment.
✅ Fixed in commit 88838d5: Changed from conditional json={'ids': site_ids} if site_ids else {} to always sending json={'ids': site_ids} for consistent JSON shape. This ensures the API always receives the same structure regardless of whether the list is empty.
| >>> client.users.set_asset_groups( | ||
| ... user_id=42, asset_group_ids=[]) | ||
| """ | ||
| # API expects JSON array of asset group IDs |
There was a problem hiding this comment.
✅ FIXED - Thank you for catching this! The set_asset_groups() method now always sends a consistent JSON payload shape {'ids': asset_group_ids}, even when the list is empty. This ensures the REST API receives predictable payloads and prevents any edge case issues with empty vs absent objects. Fixed in commit e7679ba.
| def get_by_login(self, login: str, | ||
| users: Optional[List[Dict[str, Any]]] = None | ||
| ) -> Optional[Dict[str, Any]]: | ||
| """ |
There was a problem hiding this comment.
✅ FIXED - Excellent optimization suggestion! The get_by_login() method now accepts an optional users parameter to avoid redundant API calls. When called multiple times, users can pass a cached list with users=cached_users, significantly improving performance for batch operations. Fixed in commit e7679ba.
| ... "jane.smith", users=all_users) | ||
| """ | ||
| all_users = users if users is not None else self.get_all() | ||
| for user in all_users: |
There was a problem hiding this comment.
✅ FIXED - Great catch! The get_enabled_users() method now accepts an optional users parameter to avoid redundant API calls. Users can pass a cached list with users=cached_users when performing multiple filtering operations, greatly improving performance for batch processing. Fixed in commit e7679ba.
|
|
||
| >>> # With caching to avoid redundant API calls | ||
| >>> all_users = client.users.get_all() | ||
| >>> enabled = client.users.get_enabled_users( |
There was a problem hiding this comment.
✅ FIXED - Excellent optimization! The get_disabled_users() method now accepts an optional users parameter to avoid redundant API calls. When filtering multiple user lists, users can pass a cached list with users=cached_users, significantly improving performance. Fixed in commit e7679ba.
| Example: | ||
| >>> locked = client.users.get_locked_users() | ||
| >>> for user in locked: | ||
| ... print(f"Locked: {user['login']}") |
There was a problem hiding this comment.
✅ FIXED - Great suggestion! The get_locked_users() method now accepts an optional users parameter to avoid redundant API calls. Users can pass a cached list with users=cached_users when performing multiple account status checks, improving performance significantly. Fixed in commit e7679ba.
|
|
||
| Example: | ||
| >>> admins = client.users.get_users_by_role( | ||
| ... 'global-admin') |
There was a problem hiding this comment.
✅ FIXED - Excellent optimization! The get_users_by_role() method now accepts an optional users parameter to avoid redundant API calls. When filtering by multiple roles, users can pass a cached list with users=cached_users, significantly improving performance for batch role filtering operations. Fixed in commit e7679ba.
| # Grant site access | ||
| client.users.grant_site_access(user_id=42, site_id=10) | ||
|
|
||
| Author: InsightVM-Python Development Team |
There was a problem hiding this comment.
✅ FIXED - Good catch! The version string has been removed from the module docstring. Only the author information remains: "Author: InsightVM-Python Development Team". Version tracking is handled through git tags and releases, not inline version strings. Fixed in commit e7679ba.
| >>> # Disable user account | ||
| >>> client.users.update(user_id=42, enabled=False) | ||
| """ | ||
| # Normalize snake_case keys to camelCase for API |
There was a problem hiding this comment.
✅ FIXED - Excellent catch! The update() method now normalizes the password_reset_on_login snake_case parameter to passwordResetOnLogin camelCase format before sending to the API. This ensures Python-style naming conventions work seamlessly with the InsightVM REST API expectations. Fixed in commit e7679ba.
| data['locale']['reports'] = locale_reports | ||
|
|
||
| # Validate and add any additional properties | ||
| # Prevent overwriting critical fields |
There was a problem hiding this comment.
✅ FIXED - Excellent security catch! The create() method now validates all kwargs to prevent accidental overwriting of protected fields. Protected fields include: login, name, enabled, authentication, role, and passwordResetOnLogin. Any attempt to override these fields via kwargs will raise a clear ValueError. This prevents payload corruption and ensures API contract integrity. Fixed in commit e7679ba.
Summary
Implements comprehensive user account management and access control operations for the InsightVM API.
Changes
New Files
src/rapid7/api/users.py- Complete UserAPI class with 24 methodsdocs/USERS_API.md- Comprehensive API documentation with examplesModified Files
src/rapid7/client.py- Integrated UserAPI into InsightVMClientFeatures
User CRUD Operations
list()- List all users with paginationget_user()- Get specific user detailscreate()- Create new user accounts (local, LDAP, Kerberos, SAML)update()- Update existing user accountsdelete_user()- Delete user accountsSite Access Management
get_sites()- Retrieve user's site accessset_sites()- Bulk set site accessgrant_site_access()- Grant access to single siterevoke_site_access()- Revoke access to single siterevoke_all_site_access()- Remove all site accessAsset Group Access Management
get_asset_groups()- Retrieve user's asset group accessset_asset_groups()- Bulk set asset group accessgrant_asset_group_access()- Grant access to single asset grouprevoke_asset_group_access()- Revoke access to single asset grouprevoke_all_asset_group_access()- Remove all asset group accessUser Management
get_privileges()- Get user privileges by rolereset_password()- Change user passwordunlock_user()- Unlock locked accountsget_2fa_key()- Retrieve 2FA token seedremove_2fa()- Remove 2FA from userHelper Methods
get_all()- Automatic pagination to retrieve all usersget_by_login()- Find user by usernameget_enabled_users()- Get only enabled accountsget_locked_users()- Get only locked accountsget_users_by_role()- Filter by role IDcreate_admin()- Convenience method for creating adminsImplementation Details
Documentation
Included comprehensive documentation covering:
Testing
Closes
Fixes #73