Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ dependencies = [
"cachetools>=6.1.0",
"prometheus-client>=0.22.1",
"starlette>=0.47.1",
"aiohttp>=3.12.14",
"authlib>=1.6.0",
]

[tool.pyright]
Expand Down Expand Up @@ -53,6 +55,7 @@ dev = [
"ruff>=0.11.13",
"aiosqlite",
"behave>=1.2.6",
"types-cachetools>=6.1.0.20250717",
]
build = [
"build>=1.2.2.post1",
Expand Down
9 changes: 7 additions & 2 deletions src/auth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging

from auth.interface import AuthInterface
from auth import noop, noop_with_token, k8s
from auth import noop, noop_with_token, k8s, jwk_token
from configuration import configuration
import constants

Expand All @@ -15,7 +15,7 @@ def get_auth_dependency(
virtual_path: str = constants.DEFAULT_VIRTUAL_PATH,
) -> AuthInterface:
"""Select the configured authentication dependency interface."""
module = configuration.authentication_configuration.module # pyright: ignore
module = configuration.authentication_configuration.module

logger.debug(
"Initializing authentication dependency: module='%s', virtual_path='%s'",
Expand All @@ -32,6 +32,11 @@ def get_auth_dependency(
)
case constants.AUTH_MOD_K8S:
return k8s.K8SAuthDependency(virtual_path=virtual_path)
case constants.AUTH_MOD_JWK_TOKEN:
return jwk_token.JwkTokenAuthDependency(
configuration.authentication_configuration.jwk_configuration,
virtual_path=virtual_path,
)
case _:
err_msg = f"Unsupported authentication module '{module}'"
logger.error(err_msg)
Expand Down
191 changes: 191 additions & 0 deletions src/auth/jwk_token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
"""Manage authentication flow for FastAPI endpoints with JWK based JWT auth."""

import logging
from asyncio import Lock
from typing import Any, Callable

from fastapi import Request, HTTPException, status
from authlib.jose import JsonWebKey, KeySet, jwt, Key
from authlib.jose.errors import (
BadSignatureError,
DecodeError,
ExpiredTokenError,
JoseError,
)
from cachetools import TTLCache
import aiohttp

from constants import (
DEFAULT_VIRTUAL_PATH,
)
from auth.interface import AuthInterface
from auth.utils import extract_user_token
from models.config import JwkConfiguration

logger = logging.getLogger(__name__)

# Global JWK registry to avoid re-fetching JWKs for each request. Cached for 1
# hour, keys are unlikely to change frequently.
_jwk_cache: TTLCache[str, KeySet] = TTLCache(maxsize=3, ttl=3600)
# Ideally this would be an RWLock, but it would require adding a dependency on
# aiorwlock
_jwk_cache_lock = Lock()


async def get_jwk_set(url: str) -> KeySet:
"""Fetch the JWK set from the cache, or fetch it from the URL if not cached."""
async with _jwk_cache_lock:
if url not in _jwk_cache:
async with aiohttp.ClientSession() as session:
# TODO(omertuc): handle connection errors, timeouts, etc.
async with session.get(url) as resp:
resp.raise_for_status()
_jwk_cache[url] = JsonWebKey.import_key_set(await resp.json())
return _jwk_cache[url]


class KeyNotFoundError(Exception):
"""Exception raised when a key is not found in the JWK set based on kid/alg."""


def key_resolver_func(
jwk_set: KeySet,
) -> Callable[[dict[str, Any], dict[str, Any]], Key]:
"""
Create a key resolver function.

Return a function to find a key in the given jwk_set. The function matches the
signature expected by the jwt.decode key kwarg.
"""

def _internal(header: dict[str, Any], _payload: dict[str, Any]) -> Key:
"""Match kid and alg from the JWT header to the JWK set.

Resolve the key from the JWK set based on the JWT header. Also
match the algorithm to make sure the algorithm stated by the user
is the same algorithm the key itself expects.

# We intentionally do not use find_by_kid because it's a bad function
# that doesn't take the alg into account
"""
if "alg" not in header:
raise KeyNotFoundError("Token header missing 'alg' field")

if "kid" in header:
keys = [key for key in jwk_set.keys if key.kid == header.get("kid")]

if len(keys) == 0:
raise KeyNotFoundError(
"No key found matching kid and alg in the JWK set"
)

if len(keys) > 1:
# This should never happen! Bad JWK set!
raise KeyNotFoundError(
"Internal server error, multiple keys found matching this kid"
)

key = keys[0]

if key["alg"] != header["alg"]:
raise KeyNotFoundError(
"Key found by kid does not match the algorithm in the token header"
)

return key

# No kid in the token header, we will try to find a key by alg
keys = [key for key in jwk_set.keys if key["alg"] == header["alg"]]

if len(keys) == 0:
raise KeyNotFoundError("No key found matching alg in the JWK set")

# Token has no kid and even we have more than one key with this algorithm - we will
# return the first key which matches the algorithm, hopefully it will
# match the token, but if not, unlucky - we're not going to brute-force all
# keys until we find the one that matches, that makes us more vulnerable to DoS
return keys[0]

return _internal


class JwkTokenAuthDependency(AuthInterface): # pylint: disable=too-few-public-methods
"""JWK AuthDependency class for JWK-based JWT authentication."""

def __init__(
self, config: JwkConfiguration, virtual_path: str = DEFAULT_VIRTUAL_PATH
) -> None:
"""Initialize the required allowed paths for authorization checks."""
self.virtual_path: str = virtual_path
self.config: JwkConfiguration = config

async def __call__(self, request: Request) -> tuple[str, str, str]:
"""Authenticate the JWT in the headers against the keys from the JWK url."""
user_token = extract_user_token(request.headers)

jwk_set = await get_jwk_set(str(self.config.url))

try:
claims = jwt.decode(user_token, key=key_resolver_func(jwk_set))
except KeyNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token: signed by unknown key or algorithm mismatch",
) from exc
except BadSignatureError as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token: bad signature",
) from exc
except DecodeError as exc:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid token: decode error",
) from exc
except JoseError as exc:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid token: unknown error",
) from exc
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Internal server error",
) from exc

try:
claims.validate()
except ExpiredTokenError as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired"
) from exc
except JoseError as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Error validating token",
) from exc
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Internal server error during token validation",
) from exc

try:
user_id: str = claims[self.config.jwt_configuration.user_id_claim]
except KeyError as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Token missing claim: {self.config.jwt_configuration.user_id_claim}",
) from exc

try:
username: str = claims[self.config.jwt_configuration.username_claim]
except KeyError as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Token missing claim: {self.config.jwt_configuration.username_claim}",
) from exc

logger.info("Successfully authenticated user %s (ID: %s)", username, user_id)

return user_id, username, user_token
7 changes: 6 additions & 1 deletion src/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,16 @@ def mcp_servers(self) -> list[ModelContextProtocolServer]:
return self._configuration.mcp_servers

@property
def authentication_configuration(self) -> Optional[AuthenticationConfiguration]:
def authentication_configuration(self) -> AuthenticationConfiguration:
"""Return authentication configuration."""
assert (
self._configuration is not None
), "logic error: configuration is not loaded"

assert (
self._configuration.authentication is not None
), "logic error: authentication configuration is not loaded"

return self._configuration.authentication

@property
Expand Down
4 changes: 4 additions & 0 deletions src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,19 @@
AUTH_MOD_K8S = "k8s"
AUTH_MOD_NOOP = "noop"
AUTH_MOD_NOOP_WITH_TOKEN = "noop-with-token"
AUTH_MOD_JWK_TOKEN = "jwk-token"
# Supported authentication modules
SUPPORTED_AUTHENTICATION_MODULES = frozenset(
{
AUTH_MOD_K8S,
AUTH_MOD_NOOP,
AUTH_MOD_NOOP_WITH_TOKEN,
AUTH_MOD_JWK_TOKEN,
}
)
DEFAULT_AUTHENTICATION_MODULE = AUTH_MOD_NOOP
DEFAULT_JWT_UID_CLAIM = "user_id"
DEFAULT_JWT_USER_NAME_CLAIM = "username"

# Data collector constants
DATA_COLLECTOR_COLLECTION_INTERVAL = 7200 # 2 hours in seconds
Expand Down
32 changes: 32 additions & 0 deletions src/models/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,28 @@ def check_storage_location_is_set_when_needed(self) -> Self:
return self


class JwtConfiguration(BaseModel):
"""JWT configuration."""

user_id_claim: str = constants.DEFAULT_JWT_UID_CLAIM
username_claim: str = constants.DEFAULT_JWT_USER_NAME_CLAIM


class JwkConfiguration(BaseModel):
"""JWK configuration."""

url: AnyHttpUrl
jwt_configuration: JwtConfiguration = JwtConfiguration()


class AuthenticationConfiguration(BaseModel):
"""Authentication configuration."""

module: str = constants.DEFAULT_AUTHENTICATION_MODULE
skip_tls_verification: bool = False
k8s_cluster_api: Optional[AnyHttpUrl] = None
k8s_ca_cert_path: Optional[FilePath] = None
jwk_config: Optional[JwkConfiguration] = None

@model_validator(mode="after")
def check_authentication_model(self) -> Self:
Expand All @@ -164,8 +179,25 @@ def check_authentication_model(self) -> Self:
f"Unsupported authentication module '{self.module}'. "
f"Supported modules: {supported_modules}"
)

if self.module == constants.AUTH_MOD_JWK_TOKEN:
if self.jwk_config is None:
raise ValueError(
"JWK configuration must be specified when using JWK token authentication"
)

return self

@property
def jwk_configuration(self) -> JwkConfiguration:
"""Return JWK configuration if the module is JWK token."""
if self.module != constants.AUTH_MOD_JWK_TOKEN:
raise ValueError(
"JWK configuration is only available for JWK token authentication module"
)
assert self.jwk_config is not None, "JWK configuration should not be None"
return self.jwk_config


class Customization(BaseModel):
"""Service customization."""
Expand Down
Loading