Skip to content

Loading Fast Admin on Flask #111

@eneadodi

Description

@eneadodi

Console.log shows:

GET http://localhost:5000/admin/static/index.min.js net::ERR_ABORTED 404 (NOT FOUND)Understand this error
admin-tables/:20

GET http://localhost:5000/admin/static/index.min.css net::ERR_ABORTED 404 (NOT FOUND)

with app_initialize_admin:
def app_initialize_admin(app):
# Set environment variables for FastAdmin
from app.core.config import get_settings
import os

settings = get_settings()

# Required settings
os.environ["ADMIN_USER_MODEL"] = settings.ADMIN_USER_MODEL
os.environ["ADMIN_USER_MODEL_USERNAME_FIELD"] = settings.ADMIN_USER_MODEL_USERNAME_FIELD
os.environ["ADMIN_SECRET_KEY"] = str(settings.SECRET_KEY.get_secret_value())

# Important: Set the prefix to match your URL
os.environ["ADMIN_PREFIX"] = "admin-tables"

# Update static file paths to match your URL prefix
os.environ["ADMIN_SITE_SIGN_IN_LOGO"] = "/admin-tables/static/images/sign-in-logo.svg"
os.environ["ADMIN_SITE_HEADER_LOGO"] = "/admin-tables/static/images/header-logo.svg"
os.environ["ADMIN_SITE_FAVICON"] = "/admin-tables/static/images/favicon.png"

# Optional settings
os.environ["ADMIN_SITE_NAME"] = settings.ADMIN_SITE_NAME

# Import your admin models to ensure they're registered
from app.api.v1.BlueprintAdmin.models.user_authentication_admin_models import UserAdmin  

# CRITICAL: Import the FastAdmin blueprint DIRECTLY from fastadmin
from fastadmin import flask_app as admin_app

# Register the FastAdmin blueprint with your Flask app
app.register_blueprint(admin_app, url_prefix="/admin-tables")

# Log that admin was initialized
app.logger.info("FastAdmin initialized and mounted at /admin-tables")

and UserAdmin:

import typing as tp
import uuid
import asyncio
from sqlalchemy import select, update, delete, func, or_
from fastadmin import SqlAlchemyModelAdmin, register, WidgetType
from sqlalchemy.inspection import inspect
from fastadmin.models.schemas import ModelFieldWidgetSchema

Import the async_sessionmaker you created

from app.database.async_db import async_sessionmaker
from app.api.v1 import User, Role

@register(User, sqlalchemy_sessionmaker=async_sessionmaker)
class UserAdmin(SqlAlchemyModelAdmin):
# Display configuration - adjusted for your User model
exclude = ("hashed_password",)
list_display = ("id", "email", "is_active", "auth_provider", "is_verified", "clearance")
list_display_links = ("id", "email")
list_filter = ("is_active", "auth_provider", "is_verified", "clearance")
search_fields = ("email",)

# Field customization
formfield_overrides = {
    "email": (WidgetType.EmailInput, {"required": True}),
    "hashed_password": (WidgetType.PasswordInput, {"passwordModalForm": True})
}

# Add sections to form
fieldsets = (
    (None, {"fields": ("email", "hashed_password", "auth_provider")}),
    ("Status", {"fields": ("is_active", "is_verified", "clearance")}),
    ("Subscription", {"fields": ("has_subscribed", "stripe_customer_id")}),
)

@staticmethod
def get_model_pk_name(orm_model_cls):
    return "id"  # Your primary key field name

def get_model_fields_with_widget_types(
    self,
    with_m2m: bool | None = None,
    with_upload: bool | None = None,
):
    
    
    fields = []
    mapper = inspect(self.model_cls).mapper
    
    # Process regular columns
    for column in mapper.columns:
        field_name = column.name
        # Skip excluded fields
        if field_name in self.exclude:
            continue

        # Determine appropriate widget type based on column type
        widget_type = WidgetType.Input  # Default
        
        # Check for specific field overrides
        if field_name in self.formfield_overrides:
            widget_type = self.formfield_overrides[field_name][0]
        # Handle common field types
        elif hasattr(column.type, "python_type"):
            py_type = column.type.python_type
            if py_type == bool:
                widget_type = WidgetType.Switch
            elif py_type == int:
                widget_type = WidgetType.InputNumber
            # Add more mappings as needed
        
        field = ModelFieldWidgetSchema(
            name=field_name,
            column_name=field_name,
            form_widget_type=widget_type,
            is_m2m=False,
            is_upload=False,
            required=not column.nullable,
        )
        fields.append(field)
    
    # Add M2M fields if requested
    if with_m2m:
        # Handle roles as a special case
        field = ModelFieldWidgetSchema(
            name="roles",
            column_name="roles",  # This will be handled specially in orm_get_m2m_ids and orm_save_m2m_ids
            form_widget_type=WidgetType.Select,
            is_m2m=True,
            is_upload=False,
            required=False,
        )
        fields.append(field)
    
    return fields

# Authentication method - works with your role system
async def authenticate(self, username: str, password: str) -> uuid.UUID | None:
    """Authenticate a user with email and password, checking for admin role"""
    sessionmaker = self.get_sessionmaker()
    async with sessionmaker() as session:
        # Find user by email
        query = select(self.model_cls).filter_by(
            email=username, 
            is_active=True,
            is_verified=True
        )
        result = await session.execute(query)
        user = result.scalar_one_or_none()
        
        if not user:
            return None
        
        # Check password
        # Since verify_password is a synchronous method, we use asyncio.to_thread
        is_valid = await asyncio.to_thread(user.verify_password, password)
        if not is_valid:
            return None
            
        # Check if user has admin role
        has_admin = await asyncio.to_thread(user.has_role, "admin")
        if not has_admin:
            return None
            
        return user.id

async def change_password(self, id: uuid.UUID, password: str) -> None:
    """Change a user's password"""
    sessionmaker = self.get_sessionmaker()
    async with sessionmaker() as session:
        query = select(self.model_cls).filter_by(id=id)
        result = await session.execute(query)
        user = result.scalar_one_or_none()
        
        if not user:
            return
            
        # Use asyncio.to_thread to call the synchronous password.setter property
        # This is safer than directly hashing since it uses your existing logic
        await asyncio.to_thread(setattr, user, 'password', password)
        await session.commit()

# Required ORM methods
async def orm_get_list(
    self, 
    offset: int | None = None,
    limit: int | None = None,
    search: str | None = None,
    sort_by: str | None = None,
    filters: dict | None = None,
) -> tuple[list[tp.Any], int]:
    sessionmaker = self.get_sessionmaker()
    async with sessionmaker() as session:
        query = select(self.model_cls)
        
        # Add search
        if search and self.search_fields:
            search_conditions = []
            for field in self.search_fields:
                column = getattr(self.model_cls, field, None)
                if column is not None:
                    search_conditions.append(column.ilike(f"%{search}%"))
            if search_conditions:
                query = query.filter(or_(*search_conditions))
        
        # Add filters
        if filters:
            for field, value in filters.items():
                column = getattr(self.model_cls, field, None)
                if column is not None:
                    query = query.filter(column == value)
        
        # Get total count
        count_query = select(func.count()).select_from(query.subquery())
        total = await session.scalar(count_query)
        
        # Add sorting
        if sort_by:
            desc = False
            if sort_by.startswith("-"):
                sort_by = sort_by[1:]
                desc = True
            
            column = getattr(self.model_cls, sort_by, None)
            if column is not None:
                query = query.order_by(column.desc() if desc else column)
        
        # Add pagination
        if offset is not None:
            query = query.offset(offset)
        if limit is not None:
            query = query.limit(limit)
        
        # Execute query
        result = await session.execute(query)
        objects = result.scalars().all()
        
        return list(objects), total

async def orm_get_obj(self, id: uuid.UUID) -> tp.Any | None:
    sessionmaker = self.get_sessionmaker()
    async with sessionmaker() as session:
        query = select(self.model_cls).filter_by(id=id)
        result = await session.execute(query)
        return result.scalar_one_or_none()

async def orm_save_obj(self, id: uuid.UUID | None, payload: dict) -> tp.Any:
    sessionmaker = self.get_sessionmaker()
    async with sessionmaker() as session:
        if id is None:
            # Create new object
            obj = self.model_cls(**payload)
            session.add(obj)
        else:
            # Update existing object
            query = select(self.model_cls).filter_by(id=id)
            result = await session.execute(query)
            obj = result.scalar_one_or_none()
            
            if not obj:
                return None
            
            for key, value in payload.items():
                if hasattr(obj, key):
                    setattr(obj, key, value)
        
        await session.commit()
        await session.refresh(obj)
        return obj

async def orm_delete_obj(self, id: uuid.UUID) -> None:
    sessionmaker = self.get_sessionmaker()
    async with sessionmaker() as session:
        query = delete(self.model_cls).filter_by(id=id)
        await session.execute(query)
        await session.commit()

# M2M handling for roles
async def orm_get_m2m_ids(self, obj: tp.Any, field: str) -> list[uuid.UUID]:
    if field == "roles":
        # Use asyncio.to_thread since roles is a synchronous property
        roles = await asyncio.to_thread(getattr, obj, "roles")
        return [role.id for role in roles]
    return []

async def orm_save_m2m_ids(self, obj: tp.Any, field: str, ids: list[uuid.UUID]) -> None:
    if field == "roles":
        sessionmaker = self.get_sessionmaker()
        async with sessionmaker() as session:
            # Get the Role model and fetch all roles with the given IDs
            query = select(Role).filter(Role.id.in_(ids))
            result = await session.execute(query)
            roles = list(result.scalars().all())
            
            # First remove all existing role assignments
            from app.api.v1 import RoleUser
            delete_query = delete(RoleUser).where(RoleUser.user_id == obj.id)
            await session.execute(delete_query)
            
            # Then add new ones
            # Note: This is a simplified approach - you may need to handle assigned_by_id
            for role in roles:
                # Create a new role assignment with the admin user as assigner
                role_user = RoleUser(
                    user_id=obj.id,
                    role_id=role.id,
                    assigned_by_id=obj.id  # Self-assignment for simplicity
                )
                session.add(role_user)
            
            await session.commit()

async def orm_save_upload_field(self, obj: tp.Any, field: str, base64: str) -> None:
    if field == "profile_picture_url":
        # Handle profile picture upload
        # For now just update the field directly
        sessionmaker = self.get_sessionmaker()
        async with sessionmaker() as session:
            query = update(self.model_cls).where(self.model_cls.id == obj.id).values(**{field: base64})
            await session.execute(query)
            await session.commit()

I dont use index.min.css or index.min.js. But regardless I dont thing I should see an empty page on 'admin-tables/'

Any ideas?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions