Skip to content

Feature/services/authenticated endpoints#336

Open
mazenelabd wants to merge 29 commits intodevfrom
feature/services/authenticated-endpoints
Open

Feature/services/authenticated endpoints#336
mazenelabd wants to merge 29 commits intodevfrom
feature/services/authenticated-endpoints

Conversation

@mazenelabd
Copy link
Contributor

@mazenelabd mazenelabd commented Jan 18, 2026

Service-Level Authentication and Password Utilities

Overview

This PR introduces an authentication system for Mindtrace services, enabling instance-level authentication with Bearer token support, along with password hashing utilities in the core module.

Key Features

Service-Level Authentication

  • Instance-based authentication
  • Scope-based endpoints: Support for PUBLIC and AUTHENTICATED endpoint scopes via Scope enum
  • Token verification: Flexible token verification with support for both sync and async verifiers
  • User injection: FastAPI dependency injection for accessing current user information in endpoints

Connection Manager Header Support

  • Default headers: Set default headers (e.g., Authorization Bearer tokens) on connection managers for all requests
  • Per-request headers: Override default headers on a per-request basis

Password Hashing Utilities

  • Secure password hashing: New hash_password() and verify_password() utilities using Argon2 (via pwdlib)
  • Advanced access: get_password_hasher() for direct access to pwdlib features
  • Core module integration: Password utilities exported from mindtrace.core for easy access

Samples

  • CRUD service sample: Complete example of CRUD operations using the services module
  • Authenticated CRUD service: Full authentication example with MongoDB, JWT tokens, and user management
  • Header usage examples: Demonstrates both default and per-request header patterns

Technical Changes

Services Module (mindtrace/services)

  • Added set_token_verifier() method to Service class for instance-level token verification
  • Added get_auth_dependency() and get_current_user_dependency() for FastAPI integration
  • Enhanced add_endpoint() to support scope parameter for authentication control
  • Updated ConnectionManager with set_default_headers() and clear_default_headers() methods
  • Modified generate_connection_manager() to support header forwarding in generated methods

Core Module (mindtrace/core)

  • New password.py utility module with password hashing functions
  • Added pwdlib>=0.3.0 as a dependency
  • Updated exports to include password utilities

Testing

  • 100% code coverage for all new lines

Usage

For Services Using Authentication

# Instance-level authentication (new way)
from mindtrace.services import Service, Scope

class MyService(Service):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.set_token_verifier(self.verify_token)
        
        self.add_endpoint("protected", self.get_data, scope=Scope.AUTHENTICATED)
    
    def verify_token(self, token: str) -> dict:
        # Your verification logic
        return {"user_id": "123"}

For Connection Managers Using Headers

# Set default headers (recommended)
cm = MyService.launch()
cm.set_default_headers({"Authorization": f"Bearer {token}"})
result = cm.protected_endpoint()  # Automatically includes headers

# Or use per-request headers
result = cm.protected_endpoint(headers={"Authorization": f"Bearer {token}"})

@mazenelabd mazenelabd requested a review from vik-rant January 18, 2026 12:31
@mazenelabd mazenelabd self-assigned this Jan 18, 2026
@mazenelabd mazenelabd linked an issue Jan 19, 2026 that may be closed by this pull request
Comment on lines +16 to +55
class _TokenVerifierState:
"""Module-level state for token verifier."""

verifier: Optional[Union[Callable[[str], dict], Callable[[str], Awaitable[dict]]]] = None


_state = _TokenVerifierState()


def set_token_verifier(verifier: Union[Callable[[str], dict], Callable[[str], Awaitable[dict]]]):
"""Set a custom token verification function.

The verifier function can be either synchronous or asynchronous:
- Accept a token string as input
- Return a dict with user information if token is valid (or Awaitable[dict] for async)
- Raise HTTPException if token is invalid

Args:
verifier: A function that verifies tokens and returns user info (sync or async)

Example:
# Synchronous verifier
def verify_token(token: str) -> dict:
# Verify JWT token, check signature, etc.
# Return user info like {"user_id": "123", "username": "john"}
pass

# Asynchronous verifier
async def verify_token_async(token: str) -> dict:
# Async token verification (e.g., checking against database)
pass

set_token_verifier(verify_token) # or verify_token_async
"""
_state.verifier = verifier


def get_token_verifier() -> Optional[Callable[[str], dict]]:
"""Get the current token verification function."""
return _state.verifier
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

module-level global token_verifier ends up getting shared by all services in a process, and cumbersome to test.
consider making it an instance variable of Service.

Comment on lines +93 to +94
if verifier is None:
return {"token": token, "authenticated": True}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

failing to call set_token_verifier() (dev oversight or otherwise) on authenticated endpoints will silently grant all access.
should raise exception when no verifier is set, and authentication is expected.

Comment on lines +105 to +110
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Token verification failed: {str(e)}",
headers={"WWW-Authenticate": "Bearer"},
) from e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implementation details, stack traces, file paths etc. might end up in the API responses.
consider using a general response "Token verification failed" and logging the actual error server-side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say it is a better practice to keep the 401 but we can replace it if you think it is better to raise some other error message
https://security.stackexchange.com/questions/110759/unauthorized-access-to-a-page-when-or-if-you-should-return-401-http-status-cod#/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I am not suggesting to remove the 401.
We should return a 401 response but without the details of e (which may include local implementation-specific details.
Instead log the error details from e and then continue raising 401.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense

class _TokenVerifierState:
"""Module-level state for token verifier."""

verifier: Optional[Union[Callable[[str], dict], Callable[[str], Awaitable[dict]]]] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
verifier: Optional[Union[Callable[[str], dict], Callable[[str], Awaitable[dict]]]] = None
verifier: Callable[[str], dict | Awaitable[dict]] | None = None

_state = _TokenVerifierState()


def set_token_verifier(verifier: Union[Callable[[str], dict], Callable[[str], Awaitable[dict]]]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def set_token_verifier(verifier: Union[Callable[[str], dict], Callable[[str], Awaitable[dict]]]):
def set_token_verifier(verifier: Callable[[str], dict | Awaitable[dict]]):

_state.verifier = verifier


def get_token_verifier() -> Optional[Callable[[str], dict]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def get_token_verifier() -> Optional[Callable[[str], dict]]:
def get_token_verifier() -> Callable[[str], dict | Awaitable[dict]] | None:


try:
# Check if verifier is async
if inspect.iscoroutinefunction(verifier):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if inspect.iscoroutinefunction(verifier):
if inspect.isawaitable(verifier):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. inspect.isawaitable() doesn't work on function objects, it only works on coroutines/futures. We need to check before calling whether the function is async, so I think iscoroutinefunction() is the right approach here. I also came across this aio-libs/sphinxcontrib-asyncio#4.
Please let me know if I am missing something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right.
I missed adding a key line in the suggestion.
The general idea is:

async def verify_token(token: str) -> dict:
    try:
        user_info = verifier(token)  # missed this line earlier. If sync, we already have a result, otherwise check if we need to await
        if inspect.isawaitable(user_info):
            user_info = await user_info
        return user_info
    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(401) from e

Sometimes sync functions may return awaitables (Future, a wrapped async def, or something with an __await__ or an async __call__), or if token verifier is decorated (logging, retry, caching, tracing, etc.), then:

inspect.iscoroutinefunction(verifier)

will return False. But:

res = verifier(token)
inspect.isawaitable(res)

will handle all these cases.

iscoroutinefunction() answers "was this written as async def?"
isawaitable() answers "do we need to await this now?"
Runtime execution should care about the second one IMO.

Comment on lines +584 to +589
if isinstance(scope, str):
try:
scope = Scope(scope.lower())
except ValueError:
self.logger.warning("Invalid scope '%s', defaulting to PUBLIC", scope)
scope = Scope.PUBLIC
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's prefer to raise and fail fast when an incorrect scope str is provided while adding endpoints.
currently a typo like scope="authenticate" will just log a warning and make the endpoint public.

# Add authentication dependency if required
dependencies = list(api_route_kwargs.get("dependencies", []))
if auth_dependency is not None:
dependencies.append(auth_dependency)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auth_dependency should be inserted first to perform auth check first and fail fast?

from mindtrace.core import TaskSchema
from mindtrace.database import MindtraceDocument, MongoMindtraceODM
from mindtrace.services import Scope, Service, set_token_verifier
from mindtrace.services.core.auth import verify_token as get_current_user
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confusing alias change from verify_token (semantically f(token: str) -> bool) to get_current_user (semantically f(?) -> User/dict/?) in the very first sample usage a user encounters. What is this function supposed to do?

updated_at=user.updated_at.isoformat(),
)

async def get_user(self, user_id: str, current_user: dict = Depends(get_current_user)) -> UserOutput:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depends(get_current_user) will run the auth check a second time. Would have already been called once when we hit the endpoint since it is added as Scope.AUTHENTICATED in add_endpoint()?

Comment on lines +439 to +442
set_service_instance(self)

# Set up token verification (use async version)
self.set_user_authenticator(verify_token_async)
Copy link
Contributor

@vik-rant vik-rant Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of a module-level verify_token_async and a freestanding set_service_instance, can it be a bound method on the service, e.g. self.verify_token_async and no need for global set_service_instance stuff.
Overall i think we should clarify the following processes:

  • extraction of token via schemes
  • verifying a token, and extracting claims
  • loading user details from verified claims
  • usage of scope.AUTHENTICATED on self.add_endpoint() vs (or +) current_user=Depends(self.get_current_user) on the endpoint function definition. Which one does what, how to use them, is it one vs the other or both? Is the result from one cached and fed to the other, or is JWT decoding + DB fetch being repeated?

example flow for an endpoint seems to be:

Request to /get_user with Bearer token
    │
    ├─► scope=Scope.AUTHENTICATED triggers _create_verify_token_dependency()
    │       └─► calls verify_token_async(token)  ← FIRST CALL (DB lookup, JWT decode)
    │
    └─► Depends(get_current_user) triggers _create_get_user_dependency()
            └─► calls verify_token_async(token)  ← SECOND CALL (same work repeated)

Comment on lines +236 to +262
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash.

Uses pwdlib with Argon2 as recommended by FastAPI.

Args:
plain_password: Plain text password
hashed_password: Hashed password from database

Returns:
True if password matches, False otherwise
"""
return password_hash.verify(plain_password, hashed_password)


def get_password_hash(password: str) -> str:
"""Hash a password using Argon2.

Uses pwdlib with Argon2 as recommended by FastAPI.

Args:
password: Plain text password

Returns:
Hashed password string
"""
return password_hash.hash(password)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use hash_password and verify_password from mindtrace.core now.
There is also a convenience util verify_and_maybe_upgrade which should be useful for login flows.

Comment on lines +760 to +791
def get_auth_dependency(self, scope: Scope):
"""Get authentication dependency based on scope for this service instance.

Args:
scope: The endpoint scope (PUBLIC or AUTHENTICATED)

Returns:
Security dependency for AUTHENTICATED scope, None for PUBLIC
"""
if scope == Scope.AUTHENTICATED:
return Security(self._create_verify_token_dependency())
return None

def get_current_user_dependency(self):
"""Get a FastAPI dependency for accessing the current authenticated user.


Returns:
A FastAPI dependency function that returns the current user dict

Example:
get_current_user = service.get_current_user_dependency()

async def my_endpoint(
self,
current_user: Annotated[dict, Depends(get_current_user)]
):
user_id = current_user["user_id"]
# Use user data for authorization, etc.
"""
return self._create_get_user_dependency()

Copy link
Contributor

@uzairali19 uzairali19 Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are creating a new HTTPBearer() instance, inside both _create_verify_token_dependency() and _create_get_user_dependency().

Since get_auth_dependency() and get_current_user_dependency() call these methods during route registration, this results in multiple security scheme instances being created and duplicated entries in the OpenAPI specification.

We should create a single HTTPBearer instance (for example, in __init__ or set_user_authenticator()) and reuse it across.

fastapi/fastapi#2750

)

try:
result = authenticator(token)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I have an endpoint that uses both, the scope as Scope.AUTHENTICATED and Depends(get_current_user), the authenticator function is executed twice per request:
• once via the route-level Security dependency
• once via the Depends(get_current_user) injection

Both are calling the authenticator independently, and they don't share the state. FastAPI’s dependency caching does not apply here because these are different dependency functions.

This results in duplicate JWT verification/database lookups per request and could be avoided by sharing or caching the authentication result per request.

https://fastapi.tiangolo.com/tutorial/dependencies/#caching-dependencies

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. The fastapi link shared here has no section #caching-dependencies. AI-generated link?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is interesting indeed 😁
There isn’t a public changelog for documentation anchors.

The behavior I’m referring to is still documented under “Using the same dependency multiple times”:
https://fastapi.tiangolo.com/tutorial/dependencies/sub-dependencies/#using-the-same-dependency-multiple-times

FastAPI caches dependencies per request only when the same dependency callable is reused.
Using both Security(...) and Depends(get_current_user) creates dependency entries, so the auth logic runs twice.

The behavior itself hasn't changed


service.set_user_authenticator(authenticate_user) # or authenticate_user_lightweight or authenticate_user_async
"""
self.user_authenticator = authenticator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Over here, the set_user_authenticator() directly assigns self.user_authenticator with no safeguards.

If this method is called again at runtime (or set to None), the service's authentication behaviour would silently change or break.

We should restrict this to one-time initialization at startup or guard it against reassignment after service is launched.

https://owasp.org/www-project-web-security-testing-guide/latest/4-Web_Application_Security_Testing/02-Configuration_and_Deployment_Management_Testing/

Comment on lines +752 to +756
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token verification failed",
headers={"WWW-Authenticate": "Bearer"},
) from e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Over here, even if the HTTP response body is sanitized, the original exception is still attached as __cause__ and can be exploited by logging, middleware, or a monitoring system.

If e contains sensitive information (such as database or infrastructure details), this could unintentionally leak through logs or error pipelines. If full suppression is intended, from None would be safer.

https://owasp.org/www-community/Improper_Error_Handling

headers={"WWW-Authenticate": "Bearer"},
)

token = credentials.credentials
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is bad, we are sending raw credentials.credentials value without any validation directly to the authenticator.

I could give this some random large strings on the Authentication header, potentially triggering expensive JWT parsing or database lookups and creating a denial-of-service vector.

We should add some basic safeguards, max length format validation, before invoking the authenticator.

https://owasp.org/www-community/attacks/Denial_of_Service

Comment on lines +734 to +738
if user_info is None:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="User authenticator returned None. get_current_user_dependency() requires an authenticator that returns user data (dict).",
headers={"WWW-Authenticate": "Bearer"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are exposing the internal method name here and the implementation details. Is this intended? Why are we doing this?

We should have a generic internal error message for this case

https://owasp.org/www-community/Improper_Error_Handling

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should enforce Authentication for the MCP tools as well.

When this is called on the service level, the tool will bypass the authentication regardless of the scope parameter.

Tools are externally callable as well

# All subsequent requests will include the Authorization header
result = cm.some_endpoint(...)
"""
self._default_headers.update(headers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function is updating, not setting.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add authenticated endpoints to the services module

3 participants