A repository library that wraps SQLAlchemy and provides built-in CRUD and a query DSL.
Once you inherit from BaseRepository, you can use create/read/update/delete right away.
- No need to re-implement simple CRUD over and over.
- Mypy-friendly. Provides generic-aware return types.
- If you need custom repo methods, you can add them in your project and extend freely.
- Supports introducing a Mapper for conversions between Pydantic schemas and SQLAlchemy ORM models.
The table below lists the minimum required versions for each supported dependency.
To see more about how to test and the result.
| python_version | sqlalchemy | pydantic |
|---|---|---|
| 3.10 | >= 1.4 | >= 1.10 |
| >= 3.13 | >= 1.4 | >= 2.8, 1.7-1.10 |
If you want a working FastAPI example that uses this library end-to-end:
- FastAPI blog API template: https://github.com/4jades/fastapi-baserepository-template
pip install base-repositoryfrom dataclasses import dataclass
from pydantic import BaseModel
from base_repository import BaseRepoFilter, BaseRepository
# Example: existing SQLAlchemy ORM model and Pydantic schema in your project
class Base(DeclarativeBase):
pass
class UserModel(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(nullable=False)
class UserSchema(BaseModel):
id: int
name: str
model_config = ConfigDict(from_attributes=True)
# What you need to add for Base Repository: Filter + Repo
@dataclass
class UserFilter(BaseRepoFilter):
id: int | iterable[int] | None = None
name: str | iterable[str] | None = None
class UserRepo(BaseRepository[UserModel, UserSchema]): # Optional: return Pydantic objects by default
filter_class = UserFilter
# If you don't need schema mapping
class UserRepo(BaseRepository[UserModel]):
filter_class = UserFilter
# If you need mapper
class UserRepo(BaseRepository[UserModel, UserSchema]):
filter_class = UserFilter
mapper = UsermapperDecide how the repository should obtain an AsyncSession.
Recommended: inject a SessionProvider once, so the repo can fetch sessions when needed.
from typing import Protocol
from base_repository import SessionProvider, BaseRepository
# Example SessionProvider implementation
class MysqlSessionProvider(SessionProvider):
def get_session(self) -> AsyncSession:
return Mysql.current_session()
BaseRepository.configure_session_provider(MysqlSessionProvider())Alternatively, you can pass a session directly when creating the repo.
async with AsyncSession(engine) as session:
repo = UserRepo(session)Alternatively, you can pass the session directly when calling a method.
await repo.create({"name": "Alice", "email": "a@test.com"}, session=session)repo = UserRepo()
# Get one
user = await repo.get(UserFilter(name=["Alice", "Bob"]))
user_orm = await repo.get(UserFilter(name="Alice"), convert_schema=False)
# Get one (required)
user2 = await repo.get_or_fail(UserFilter(id=1))
# Create (dict)
created = await repo.create({"name": "Alice", "email": "a@test.com"})
# Create (schema)
created2 = await repo.create(UserSchema(name="Bob", email="b@test.com"))
# Create (schema -> ORM)
created2_orm = await repo.create(
UserSchema(name="Bob", email="b@test.com"),
convert_schema=False,
)
# Create many
created_many = await repo.create_many(
[
{"name": "Alice", "email": "a@test.com"},
UserSchema(name="Bob", email="b@test.com"),
]
)
# Create many (skip schema conversion -> ORM list)
created_many_orm = await repo.create_many(
[
{"name": "Alice", "email": "a@test.com"},
{"name": "Bob", "email": "b@test.com"},
],
skip_convert=True,
)
# Create from ORM model (as-is)
created3 = await repo.create_from_model(UserModel(name="Chris", email="c@test.com"))
# List (OFFSET paging)
q = (
repo.list()
.where(UserFilter(name="A"))
.order_by(["id", "name"]) # order_by("id"), order_by(User.id), order_by(User.id.desc()) ..
.paging(page=1, size=20)
)
users = await repo.execute(q)
# List (CURSOR paging)
q1 = (
repo.list()
.order_by(["id", "name"]) # order_by("id"), order_by(User.id), order_by(User.id.desc()) ..
.with_cursor({}) # first page
.limit(20)
)
users1 = await repo.execute(q1)
# List (convenience: get_list)
users2 = await repo.get_list(flt=UserFilter(name="A"), order_by=["id", User.name], page=1, size=20)
# List (convenience: get_list + cursor)
users3 = await repo.get_list(order_by="id", cursor={}, size=20)
# Update / Delete / Count
cnt = await repo.count(UserFilter(name="Alice"))
updated_rows = await repo.update(UserFilter(name="Bob"), {"email": "bob@new"})
deleted_rows = await repo.delete(UserFilter(name="Alice"))
# Update (dirty checking)
base = await repo.get_or_fail(UserFilter(id=1), convert_schema=False)
after = await repo.update_from_model(base=base, update={"email": "new@test.com"}, convert_schema=True)
# add / add_all (caller controls flush/commit)
repo.add(User(name="D", email="d@test.com"))
repo.add_all([User(name="E", email="e@test.com"), User(name="F", email="f@test.com")])
await repo.session.flush()