-
Notifications
You must be signed in to change notification settings - Fork 30
Description
Console.log shows:
GET http://localhost:5000/admin/static/index.min.js net::ERR_ABORTED 404 (NOT FOUND)Understand this error
admin-tables/:20
GET http://localhost:5000/admin/static/index.min.css net::ERR_ABORTED 404 (NOT FOUND)
with app_initialize_admin:
def app_initialize_admin(app):
# Set environment variables for FastAdmin
from app.core.config import get_settings
import os
settings = get_settings()
# Required settings
os.environ["ADMIN_USER_MODEL"] = settings.ADMIN_USER_MODEL
os.environ["ADMIN_USER_MODEL_USERNAME_FIELD"] = settings.ADMIN_USER_MODEL_USERNAME_FIELD
os.environ["ADMIN_SECRET_KEY"] = str(settings.SECRET_KEY.get_secret_value())
# Important: Set the prefix to match your URL
os.environ["ADMIN_PREFIX"] = "admin-tables"
# Update static file paths to match your URL prefix
os.environ["ADMIN_SITE_SIGN_IN_LOGO"] = "/admin-tables/static/images/sign-in-logo.svg"
os.environ["ADMIN_SITE_HEADER_LOGO"] = "/admin-tables/static/images/header-logo.svg"
os.environ["ADMIN_SITE_FAVICON"] = "/admin-tables/static/images/favicon.png"
# Optional settings
os.environ["ADMIN_SITE_NAME"] = settings.ADMIN_SITE_NAME
# Import your admin models to ensure they're registered
from app.api.v1.BlueprintAdmin.models.user_authentication_admin_models import UserAdmin
# CRITICAL: Import the FastAdmin blueprint DIRECTLY from fastadmin
from fastadmin import flask_app as admin_app
# Register the FastAdmin blueprint with your Flask app
app.register_blueprint(admin_app, url_prefix="/admin-tables")
# Log that admin was initialized
app.logger.info("FastAdmin initialized and mounted at /admin-tables")
and UserAdmin:
import typing as tp
import uuid
import asyncio
from sqlalchemy import select, update, delete, func, or_
from fastadmin import SqlAlchemyModelAdmin, register, WidgetType
from sqlalchemy.inspection import inspect
from fastadmin.models.schemas import ModelFieldWidgetSchema
Import the async_sessionmaker you created
from app.database.async_db import async_sessionmaker
from app.api.v1 import User, Role
@register(User, sqlalchemy_sessionmaker=async_sessionmaker)
class UserAdmin(SqlAlchemyModelAdmin):
# Display configuration - adjusted for your User model
exclude = ("hashed_password",)
list_display = ("id", "email", "is_active", "auth_provider", "is_verified", "clearance")
list_display_links = ("id", "email")
list_filter = ("is_active", "auth_provider", "is_verified", "clearance")
search_fields = ("email",)
# Field customization
formfield_overrides = {
"email": (WidgetType.EmailInput, {"required": True}),
"hashed_password": (WidgetType.PasswordInput, {"passwordModalForm": True})
}
# Add sections to form
fieldsets = (
(None, {"fields": ("email", "hashed_password", "auth_provider")}),
("Status", {"fields": ("is_active", "is_verified", "clearance")}),
("Subscription", {"fields": ("has_subscribed", "stripe_customer_id")}),
)
@staticmethod
def get_model_pk_name(orm_model_cls):
return "id" # Your primary key field name
def get_model_fields_with_widget_types(
self,
with_m2m: bool | None = None,
with_upload: bool | None = None,
):
fields = []
mapper = inspect(self.model_cls).mapper
# Process regular columns
for column in mapper.columns:
field_name = column.name
# Skip excluded fields
if field_name in self.exclude:
continue
# Determine appropriate widget type based on column type
widget_type = WidgetType.Input # Default
# Check for specific field overrides
if field_name in self.formfield_overrides:
widget_type = self.formfield_overrides[field_name][0]
# Handle common field types
elif hasattr(column.type, "python_type"):
py_type = column.type.python_type
if py_type == bool:
widget_type = WidgetType.Switch
elif py_type == int:
widget_type = WidgetType.InputNumber
# Add more mappings as needed
field = ModelFieldWidgetSchema(
name=field_name,
column_name=field_name,
form_widget_type=widget_type,
is_m2m=False,
is_upload=False,
required=not column.nullable,
)
fields.append(field)
# Add M2M fields if requested
if with_m2m:
# Handle roles as a special case
field = ModelFieldWidgetSchema(
name="roles",
column_name="roles", # This will be handled specially in orm_get_m2m_ids and orm_save_m2m_ids
form_widget_type=WidgetType.Select,
is_m2m=True,
is_upload=False,
required=False,
)
fields.append(field)
return fields
# Authentication method - works with your role system
async def authenticate(self, username: str, password: str) -> uuid.UUID | None:
"""Authenticate a user with email and password, checking for admin role"""
sessionmaker = self.get_sessionmaker()
async with sessionmaker() as session:
# Find user by email
query = select(self.model_cls).filter_by(
email=username,
is_active=True,
is_verified=True
)
result = await session.execute(query)
user = result.scalar_one_or_none()
if not user:
return None
# Check password
# Since verify_password is a synchronous method, we use asyncio.to_thread
is_valid = await asyncio.to_thread(user.verify_password, password)
if not is_valid:
return None
# Check if user has admin role
has_admin = await asyncio.to_thread(user.has_role, "admin")
if not has_admin:
return None
return user.id
async def change_password(self, id: uuid.UUID, password: str) -> None:
"""Change a user's password"""
sessionmaker = self.get_sessionmaker()
async with sessionmaker() as session:
query = select(self.model_cls).filter_by(id=id)
result = await session.execute(query)
user = result.scalar_one_or_none()
if not user:
return
# Use asyncio.to_thread to call the synchronous password.setter property
# This is safer than directly hashing since it uses your existing logic
await asyncio.to_thread(setattr, user, 'password', password)
await session.commit()
# Required ORM methods
async def orm_get_list(
self,
offset: int | None = None,
limit: int | None = None,
search: str | None = None,
sort_by: str | None = None,
filters: dict | None = None,
) -> tuple[list[tp.Any], int]:
sessionmaker = self.get_sessionmaker()
async with sessionmaker() as session:
query = select(self.model_cls)
# Add search
if search and self.search_fields:
search_conditions = []
for field in self.search_fields:
column = getattr(self.model_cls, field, None)
if column is not None:
search_conditions.append(column.ilike(f"%{search}%"))
if search_conditions:
query = query.filter(or_(*search_conditions))
# Add filters
if filters:
for field, value in filters.items():
column = getattr(self.model_cls, field, None)
if column is not None:
query = query.filter(column == value)
# Get total count
count_query = select(func.count()).select_from(query.subquery())
total = await session.scalar(count_query)
# Add sorting
if sort_by:
desc = False
if sort_by.startswith("-"):
sort_by = sort_by[1:]
desc = True
column = getattr(self.model_cls, sort_by, None)
if column is not None:
query = query.order_by(column.desc() if desc else column)
# Add pagination
if offset is not None:
query = query.offset(offset)
if limit is not None:
query = query.limit(limit)
# Execute query
result = await session.execute(query)
objects = result.scalars().all()
return list(objects), total
async def orm_get_obj(self, id: uuid.UUID) -> tp.Any | None:
sessionmaker = self.get_sessionmaker()
async with sessionmaker() as session:
query = select(self.model_cls).filter_by(id=id)
result = await session.execute(query)
return result.scalar_one_or_none()
async def orm_save_obj(self, id: uuid.UUID | None, payload: dict) -> tp.Any:
sessionmaker = self.get_sessionmaker()
async with sessionmaker() as session:
if id is None:
# Create new object
obj = self.model_cls(**payload)
session.add(obj)
else:
# Update existing object
query = select(self.model_cls).filter_by(id=id)
result = await session.execute(query)
obj = result.scalar_one_or_none()
if not obj:
return None
for key, value in payload.items():
if hasattr(obj, key):
setattr(obj, key, value)
await session.commit()
await session.refresh(obj)
return obj
async def orm_delete_obj(self, id: uuid.UUID) -> None:
sessionmaker = self.get_sessionmaker()
async with sessionmaker() as session:
query = delete(self.model_cls).filter_by(id=id)
await session.execute(query)
await session.commit()
# M2M handling for roles
async def orm_get_m2m_ids(self, obj: tp.Any, field: str) -> list[uuid.UUID]:
if field == "roles":
# Use asyncio.to_thread since roles is a synchronous property
roles = await asyncio.to_thread(getattr, obj, "roles")
return [role.id for role in roles]
return []
async def orm_save_m2m_ids(self, obj: tp.Any, field: str, ids: list[uuid.UUID]) -> None:
if field == "roles":
sessionmaker = self.get_sessionmaker()
async with sessionmaker() as session:
# Get the Role model and fetch all roles with the given IDs
query = select(Role).filter(Role.id.in_(ids))
result = await session.execute(query)
roles = list(result.scalars().all())
# First remove all existing role assignments
from app.api.v1 import RoleUser
delete_query = delete(RoleUser).where(RoleUser.user_id == obj.id)
await session.execute(delete_query)
# Then add new ones
# Note: This is a simplified approach - you may need to handle assigned_by_id
for role in roles:
# Create a new role assignment with the admin user as assigner
role_user = RoleUser(
user_id=obj.id,
role_id=role.id,
assigned_by_id=obj.id # Self-assignment for simplicity
)
session.add(role_user)
await session.commit()
async def orm_save_upload_field(self, obj: tp.Any, field: str, base64: str) -> None:
if field == "profile_picture_url":
# Handle profile picture upload
# For now just update the field directly
sessionmaker = self.get_sessionmaker()
async with sessionmaker() as session:
query = update(self.model_cls).where(self.model_cls.id == obj.id).values(**{field: base64})
await session.execute(query)
await session.commit()
I dont use index.min.css or index.min.js. But regardless I dont thing I should see an empty page on 'admin-tables/'
Any ideas?