Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 87 additions & 27 deletions api/apps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@

from flask_mail import Mail
from flask_session import Session
from flask_login import LoginManager
from flask_login import LoginManager, UserMixin
from common import settings
from api.utils.api_utils import server_error_response
from api.constants import API_VERSION
from flask import request as flask_request
from api.db.db_models import APIToken

__all__ = ["app"]

Expand Down Expand Up @@ -142,40 +144,98 @@ def register_page(page_path):
]


@login_manager.request_loader
def load_user(web_request):
jwt = Serializer(secret_key=settings.SECRET_KEY)
authorization = web_request.headers.get("Authorization")
if authorization:
try:
access_token = str(jwt.loads(authorization))
class DefaultUser(UserMixin):
def __init__(self, tenant_id: str):
self.tenant_id = tenant_id

if not access_token or not access_token.strip():
logging.warning("Authentication attempt with empty access token")
return None

# Access tokens should be UUIDs (32 hex characters)
if len(access_token.strip()) < 32:
logging.warning(f"Authentication attempt with invalid token format: {len(access_token)} chars")
return None
def load_user_from_jwt(authorization):
"""
Load user from JWT token for web UI authentication.

Args:
authorization: The authorization info

Returns:
User object if JWT is valid, None otherwise
"""

try:
jwt = Serializer(secret_key=settings.SECRET_KEY)
access_token = str(jwt.loads(authorization))

user = UserService.query(
access_token=access_token, status=StatusEnum.VALID.value
)
if user:
if not user[0].access_token or not user[0].access_token.strip():
logging.warning(f"User {user[0].email} has empty access_token in database")
return None
return user[0]
else:
if not access_token or not access_token.strip():
logging.warning("Authentication attempt with empty access token")
return None

# Access tokens should be UUIDs (32 hex characters)
if len(access_token.strip()) < 32:
logging.warning(f"Authentication attempt with invalid token format: {len(access_token)} chars")
return None

user = UserService.query(
access_token=access_token, status=StatusEnum.VALID.value
)
if user:
if not user[0].access_token or not user[0].access_token.strip():
logging.warning(f"User {user[0].email} has empty access_token in database")
return None
except Exception as e:
logging.warning(f"load_user got exception {e}")
return user[0]
else:
return None
else:
except Exception as e:
logging.warning(f"JWT authentication failed: {e}")
return None


def load_user_from_api_key(authorization):
"""
Load user from API Key for external API authentication.

Args:
authorization: The authorization info

Returns:
User object if API Key is valid, None otherwise
"""
try:
if os.environ.get("DISABLE_SDK"):
return None
authorization_str = flask_request.headers.get("Authorization")
if not authorization_str:
return None
authorization_list = authorization_str.split()
if len(authorization_list) < 2:
return None
token = authorization_list[1]
objs = APIToken.query(token=token)
if not objs:
return None

default_user = DefaultUser(objs[0].tenant_id)
return default_user
except Exception as e:
logging.warning(f"API Key authentication failed: {e}")
return None


@login_manager.request_loader
def load_user(web_request):
authorization = web_request.headers.get("Authorization")
if authorization is None:
return None

user = load_user_from_jwt(authorization)
if user:
return user

user = load_user_from_api_key(authorization)
if user:
return user

return None


@app.teardown_request
def _db_close(exc):
close_connection()