Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions mindtrace/database/mindtrace/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
from mindtrace.database.backends.mindtrace_odm import InitMode, MindtraceODM
from mindtrace.database.backends.mongo_odm import MindtraceDocument, MongoMindtraceODM
from mindtrace.database.backends.redis_odm import MindtraceRedisDocument, RedisMindtraceODM
from mindtrace.database.backends.registry_odm import RegistryMindtraceODM
from mindtrace.database.backends.unified_odm import (
BackendType,
UnifiedMindtraceDocument,
UnifiedMindtraceODM,
)
from mindtrace.database.core.exceptions import DocumentNotFoundError, DuplicateInsertError
from mindtrace.database.core.exceptions import DocumentNotFoundError, DuplicateInsertError, QueryNotSupported

__all__ = [
"BackendType",
Expand All @@ -19,11 +18,12 @@
"DocumentNotFoundError",
"DuplicateInsertError",
"Link",
"RegistryMindtraceODM",
"MindtraceDocument",
"MindtraceRedisDocument",
"MongoMindtraceODM",
"RedisMindtraceODM",
"UnifiedMindtraceDocument",
"UnifiedMindtraceODM",
"QueryNotSupported",
]

251 changes: 94 additions & 157 deletions mindtrace/database/mindtrace/database/backends/mindtrace_odm.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import abstractmethod
from enum import Enum
from typing import Type
from typing import Any, Literal, Type

from pydantic import BaseModel

Expand All @@ -15,198 +15,135 @@ class InitMode(Enum):


class MindtraceODM(MindtraceABC):
"""Portable ODM contract for all backends (Mongo, Redis, ...).

Canonical query-style API
-------------------------
insert_one(doc)
find(where=None, sort=None, limit=None)
find_one(where, sort=None)
update_one(where, set_fields, upsert=False, return_document="none")
delete_one(where)
delete_many(where)
distinct(field, where=None)

Portable filter subset (required across all backends)
-----------------------------------------------------
Equality: {"field": value}
List-as-IN: {"field": [v1, v2, ...]}
OR: {"$or": [clause1, clause2, ...]}

Any unsupported filter shape MUST raise ``QueryNotSupported``.
Backends MUST NOT silently broaden queries (e.g. full scans).

Legacy compatibility API (id / full-document style)
---------------------------------------------------
insert, get, update, delete, all

These remain for backward compatibility but are not the preferred
path for new code. Registry backends should use only canonical methods.
"""
Abstract base class for all Mindtrace Object Document Mapping (ODM) backends.

This class defines the common interface that all database backends must implement
to provide consistent data persistence operations across different storage engines
like MongoDB, Redis, and local storage.

Example:
.. code-block:: python

from mindtrace.database.backends.mindtrace_odm import MindtraceODM

class CustomBackend(MindtraceODM):
def is_async(self) -> bool:
return False

def insert(self, obj):
# Implementation here
pass
"""
# ── async flag ──────────────────────────────────────────────────────

@abstractmethod
def is_async(self) -> bool:
"""
Determine if this backend operates asynchronously.

Returns:
bool: True if the backend uses async operations, False otherwise.
"""Return True if this backend uses async operations."""

Example:
.. code-block:: python
# ── canonical query-style API ───────────────────────────────────────

backend = SomeBackend()
if backend.is_async():
result = await backend.insert(document)
else:
result = backend.insert(document)
"""
def insert_one(self, doc: BaseModel | dict):
"""Insert one document. Returns the inserted document."""
return self.insert(doc)

@abstractmethod
def insert(self, obj: BaseModel):
"""
Insert a new document into the database.
def find(
self,
where: dict | None = None,
sort: list[tuple[str, int]] | None = None,
limit: int | None = None,
**kwargs,
) -> list[BaseModel]:
"""Find documents matching a portable filter.

Args:
obj (BaseModel): The document object to insert into the database.
where: Portable filter dict. Supported operators are equality,
list-as-IN, and ``$or``.
sort: List of ``(field, direction)`` pairs where direction is
1 (ascending) or -1 (descending).
limit: Maximum number of results.
**kwargs: Backend options (must not change filter semantics).

Returns:
The inserted document with any generated fields (like ID) populated.
list[BaseModel]: Matching documents.

Raises:
DuplicateInsertError: If the document violates unique constraints.

Example:
.. code-block:: python
QueryNotSupported: If the filter cannot be represented portably.
"""

from pydantic import BaseModel
def find_one(self, where: dict, **kwargs) -> BaseModel | None:
"""Find first document matching *where*. Returns None when empty."""
results = self.find(where=where, limit=1, **kwargs)
return results[0] if results else None

class User(BaseModel):
name: str
email: str
@abstractmethod
def update_one(
self,
where: dict,
set_fields: dict,
upsert: bool = False,
return_document: Literal["none", "before", "after"] = "none",
) -> Any:
"""Update exactly one matching document (partial field update).

user = User(name="John", email="john@example.com")
inserted_user = backend.insert(user)
Args:
where: Portable filter dict.
set_fields: Fields to update (``$set`` semantics).
upsert: Insert a new document when no match exists.
return_document:
``"none"`` - return a backend update-result object.
``"before"`` - return document snapshot before update (or None).
``"after"`` - return document snapshot after update (or None).
"""

@abstractmethod
def get(self, id: str) -> BaseModel:
"""
Retrieve a document by its unique identifier.
def delete_one(self, where: dict) -> int:
"""Delete exactly one matching document. Returns 0 or 1."""

Args:
id (str): The unique identifier of the document to retrieve.
@abstractmethod
def delete_many(self, where: dict) -> int:
"""Delete all matching documents. Returns deleted count."""

Returns:
BaseModel: The document if found.
@abstractmethod
def distinct(self, field: str, where: dict | None = None) -> list[Any]:
"""Return distinct values for *field* among documents matching *where*."""

Raises:
DocumentNotFoundError: If no document with the given ID exists.
# ── legacy compatibility API (id / full-document style) ─────────────
# Prefer canonical query-style methods for new code:
# insert_one / find / find_one / update_one / delete_one / delete_many / distinct

Example:
.. code-block:: python
@abstractmethod
def insert(self, obj: BaseModel):
"""Legacy: insert a document by object. Prefer ``insert_one``."""

try:
user = backend.get("user_123")
print(f"Found user: {user.name}")
except DocumentNotFoundError:
print("User not found")
"""
@abstractmethod
def get(self, id: str) -> BaseModel:
"""Legacy: retrieve a document by id. Prefer ``find_one``."""

@abstractmethod
def update(self, obj: BaseModel):
"""
Update an existing document in the database.

The document object should have been retrieved from the database,
modified, and then passed to this method to save the changes.

Args:
obj (BaseModel): The document object with modified fields to save.

Returns:
BaseModel: The updated document.

Raises:
DocumentNotFoundError: If the document doesn't exist in the database.

Example:
.. code-block:: python

# Get the document
user = backend.get("user_123")
# Modify it
user.age = 31
user.name = "John Updated"
# Save the changes
updated_user = backend.update(user)
"""
"""Legacy: full-document save by id/pk. Prefer ``update_one``."""

@abstractmethod
def delete(self, id: str):
"""
Delete a document by its unique identifier.

Args:
id (str): The unique identifier of the document to delete.

Raises:
DocumentNotFoundError: If no document with the given ID exists.

Example:
.. code-block:: python

try:
backend.delete("user_123")
print("User deleted successfully")
except DocumentNotFoundError:
print("User not found")
"""
"""Legacy: delete a document by id. Prefer ``delete_one``."""

@abstractmethod
def all(self) -> list[BaseModel]:
"""
Retrieve all documents from the collection.

Returns:
list[BaseModel]: A list of all documents in the collection.

Example:
.. code-block:: python
"""Legacy: retrieve all documents. Prefer ``find()``."""

all_users = backend.all()
print(f"Found {len(all_users)} users")
for user in all_users:
print(f"- {user.name}")
"""

@abstractmethod
def find(self, *args, **kwargs) -> list[BaseModel]:
"""
Find documents matching the specified criteria.

Args:
*args: Query conditions and filters. The exact format depends on the backend.
**kwargs: Additional query parameters.

Returns:
list[BaseModel]: A list of documents matching the query criteria.

Example:
.. code-block:: python

# Find users with specific email (backend-specific syntax)
users = backend.find(User.email == "john@example.com")

# Find all users if no criteria specified
all_users = backend.find()
"""
# ── introspection ───────────────────────────────────────────────────

@abstractmethod
def get_raw_model(self) -> Type[BaseModel]:
"""
Get the raw document model class used by this backend.

Returns:
Type[BaseModel]: The document model class used by this backend.
For backends that don't use a specific model class, this may
return the base BaseModel type or None.

Example:
.. code-block:: python

model_class = backend.get_raw_model()
print(f"Using model: {model_class.__name__}")
"""
"""Get the raw document model class used by this backend."""
Loading