uno is a comprehensive application framework for building data-driven applications with PostgreSQL and FastAPI. Despite its name, uno is NOT an ORM - it's a complete framework that goes well beyond traditional ORMs to provide a unified approach to database operations, API definition, and business logic.
The name "uno" (Spanish for "one") represents the unified nature of the framework, bringing together database, API, and business logic in a cohesive but loosely coupled system.
The name "uno" is also, of course, a cheeky homage to gnu.
- Domain-Driven Design: Support for building applications using DDD principles with entities, value objects, aggregates, and repositories
- Event-Driven Architecture: Built-in event system for loosely coupled components that communicate through domain events
- Modern Dependency Injection: Protocol-based DI system with proper scoping and lifecycle management, following modern Python design patterns
- CQRS Pattern: Separation of command (write) and query (read) operations for better performance and scalability
- Async-First Architecture: Enhanced async utilities for robust concurrent operations with proper cancellation handling
- Resource Management: Comprehensive resource lifecycle management with connection pooling and health monitoring
- Unified Database Management: Centralized approach to database connection management with support for both synchronous and asynchronous operations
- SQL Generation: Powerful SQL emitters for creating and managing database objects
- API Integration: FastAPI endpoint factory with automatic dependency injection
- Schema Management: Advanced schema generation and validation
- Business Logic Layer: Clean separation of business logic from database operations
- Authorization System: Built-in user and permission management
- Advanced Filtering: Dynamic query building with support for complex filters
- High-Performance Batch Operations: Optimized batch processing for efficient database operations
- Workflow Management: Support for complex business workflows and state transitions
- Metadata Management: Track relationships between entities
- PostgreSQL Integration: Leverages PostgreSQL-specific features like JSONB, ULID, and row-level security
- Vector Search: Built-in vector similarity search using pgvector with automatic embedding generation
- Graph Database: Integrated graph database capabilities with the AGE extension
- Hybrid Search: Combine vector similarity with graph relationships for powerful contextual search
- Functional Error Handling: Result pattern for handling errors without exceptions
- Advanced Security: Comprehensive security framework with encryption, MFA, audit logging, and security testing
- Field-Level Encryption: Automatic encryption of sensitive data fields with key management
- Multi-Factor Authentication: Built-in TOTP-based MFA and password policy enforcement
- Audit Logging: Immutable, tamper-evident logging of all security-relevant events
pip install uno# Import the new domain-driven design components
from uno.core import (
AggregateEntity, BaseDomainEvent,
BaseCommand, CommandBus, command_handler,
AbstractUnitOfWork, Success, Failure
)
from uno.dependencies import get_service_provider, singleton, inject_params
# Define a domain event
class UserCreatedEvent(BaseDomainEvent):
user_id: str
email: str
@property
def aggregate_id(self) -> str:
return self.user_id
# Define a domain entity
class User(AggregateEntity[str]):
def __init__(self, id: str, email: str, handle: str, full_name: str):
super().__init__(id=id)
self.email = email
self.handle = handle
self.full_name = full_name
def validate_email(self) -> bool:
if "@" not in self.email:
return False
return True
@classmethod
def create(cls, id: str, email: str, handle: str, full_name: str) -> "User":
user = cls(id, email, handle, full_name)
if not user.validate_email():
raise ValueError("Invalid email format")
# Register a domain event
user.register_event(UserCreatedEvent(
user_id=id,
email=email
))
return user
# Define a command
class CreateUserCommand(BaseCommand):
email: str
handle: str
full_name: str
# Define a command handler
@singleton
@command_handler(CreateUserCommand)
class CreateUserCommandHandler:
def __init__(self, unit_of_work: AbstractUnitOfWork):
self.unit_of_work = unit_of_work
async def handle(self, command: CreateUserCommand):
try:
# Create user entity
user_id = generate_id()
user = User.create(
id=user_id,
email=command.email,
handle=command.handle,
full_name=command.full_name
)
# Use unit of work to manage transaction
async with self.unit_of_work:
user_repo = self.unit_of_work.get_repository(UserRepository)
await user_repo.save(user)
return Success(user)
except Exception as e:
return Failure(e)
# FastAPI endpoint with dependency injection
from fastapi import APIRouter
from uno.dependencies.fastapi_integration import DIAPIRouter
# Create a router with automatic dependency injection
router = DIAPIRouter(prefix="/users", tags=["users"])
@router.post("/")
@inject_params()
async def create_user(
command: CreateUserCommand,
command_bus: CommandBus
):
result = await command_bus.dispatch(command)
if result.is_success:
user = result.value
return {"id": user.id, "email": user.email}
else:
return {"error": str(result.error)}, 400# Import the new async utilities
from uno.core.async import TaskGroup, timeout, AsyncLock
from uno.core.async_integration import cancellable, retry, timeout_handler
from uno.core.async_manager import get_async_manager, as_task
from uno.database.enhanced_session import enhanced_async_session, SessionOperationGroup
# Properly handled async task with cancellation, retries, and timeouts
@cancellable
@retry(max_attempts=3)
@timeout_handler(timeout_seconds=5.0)
async def fetch_data(data_id: str):
# The operation is automatically:
# - Cancellable (with cleanup)
# - Retryable (up to 3 times)
# - Time-limited (5 second timeout)
# Use enhanced async session
async with enhanced_async_session() as session:
# Perform database query with proper cancellation handling
result = await session.execute(f"SELECT * FROM data WHERE id = '{data_id}'")
return result.fetchone()
# Run concurrent tasks with proper structured concurrency
async def process_multiple_items(items):
results = []
# Use a task group for structured concurrency
async with TaskGroup(name="process_items") as group:
# Create tasks for each item
tasks = [
group.create_task(fetch_data(item["id"]), name=f"fetch_{i}")
for i, item in enumerate(items)
]
# Process results as they complete
for task in tasks:
try:
result = await task
results.append(result)
except Exception as e:
logger.error(f"Error processing item: {e}")
# All tasks are guaranteed to be completed or cancelled here
return results
# Register with the application lifecycle
async def startup():
# Register application startup
manager = get_async_manager()
# Start background tasks
manager.create_task(background_monitoring(), name="monitoring")
# Use the async manager to run the application
if __name__ == "__main__":
import asyncio
from uno.core.async_manager import run_application
# Run the application with proper lifecycle management
asyncio.run(run_application(startup_func=startup))# Import the resource management utilities
from uno.core.resources import CircuitBreaker, get_resource_registry
from uno.core.resource_management import (
get_resource_manager,
managed_connection_pool,
managed_background_task,
)
from uno.core.resource_monitor import get_resource_monitor
from uno.database.pooled_session import pooled_async_session
from uno.core.fastapi_integration import (
setup_resource_management,
create_health_endpoint,
)
# Create a FastAPI application with resource management
from fastapi import FastAPI, Depends
# Create the app
app = FastAPI()
# Set up resource management
setup_resource_management(app)
# Add health check endpoint
create_health_endpoint(app)
# Use pooled sessions for database access
@app.get("/example")
async def example_endpoint():
async with pooled_async_session() as session:
# Connection is from pool and managed with circuit breaker
result = await session.execute("SELECT 1")
return {"result": result.scalar()}
# Use a circuit breaker for external services
async def call_external_api():
# Create a circuit breaker
circuit = CircuitBreaker(
name="api_circuit",
failure_threshold=5,
recovery_timeout=30.0,
)
# Register with resource registry
registry = get_resource_registry()
await registry.register("api_circuit", circuit)
# Use the circuit breaker
async def api_call():
# Make the actual API call
return {"status": "success"}
# Call through the circuit breaker
return await circuit(api_call)
# Create a managed background task
async def setup_background_tasks():
async def monitoring_task():
while True:
# Monitor system health
await asyncio.sleep(60)
# Create and register the task
async with managed_background_task(
"monitoring",
monitoring_task,
restart_on_failure=True,
) as task:
# Task is running and will be properly managed
pass
# Use the resource manager to create pooled database connections
async def initialize_database():
# Get the resource manager
manager = get_resource_manager()
# Create database connection pools
pools = await manager.create_database_pools()
# Create pooled session factory
session_factory = await manager.create_session_factory()
return session_factory
# Register startup and shutdown with FastAPI
@app.on_event("startup")
async def startup():
# Initialize resources
await get_resource_manager().initialize()
# Set up background tasks
await setup_background_tasks()
# Initialize database
await initialize_database()
# Monitor resource health
@app.get("/health")
async def health_check():
# Get the resource monitor
monitor = get_resource_monitor()
# Get health summary
return await monitor.get_health_summary()# Import batch operations components
from uno.queries import BatchOperations, BatchConfig, BatchExecutionStrategy, BatchSize
from uno.domain.repository import UnoDBRepository
# Create repository with batch operations enabled
repo = UnoDBRepository(
entity_type=Product,
use_batch_operations=True,
batch_size=500,
)
# Batch get products
product_ids = ["prod-001", "prod-002", "prod-003", ..., "prod-999"]
products = await repo.batch_get(
ids=product_ids,
load_relations=["category", "reviews"],
parallel=True,
)
# Update multiple products in batch
for product in products:
if product.stock_level < 10:
product.status = "low_stock"
product.updated_at = datetime.utcnow()
# Batch update with a single database operation
updated_count = await repo.batch_update(
entities=products,
fields=["status", "updated_at"],
)
print(f"Updated {updated_count} products")
# Direct use of batch operations for advanced scenarios
batch_ops = BatchOperations(
model_class=Order,
batch_config=BatchConfig(
batch_size=BatchSize.LARGE.value,
execution_strategy=BatchExecutionStrategy.PARALLEL,
max_workers=4,
collect_metrics=True,
),
)
# Import orders with validation and duplicate handling
orders_data = load_orders_from_csv("orders.csv")
import_result = await batch_ops.batch_import(
records=orders_data,
unique_fields=["order_number"],
update_on_conflict=True,
return_stats=True,
)
print(f"Imported {import_result['inserted']} orders, updated {import_result['updated']}")# Get the service provider
provider = get_service_provider()
# Define a query using the CQRS pattern
class DocumentSearchQuery(BaseQuery):
query_text: str
limit: int = 10
threshold: float = 0.7
metric: str = "cosine"
# Define a query handler
@singleton
@query_handler(DocumentSearchQuery)
class DocumentSearchQueryHandler:
def __init__(self, vector_search_factory):
self.vector_search_factory = vector_search_factory
async def handle(self, query: DocumentSearchQuery):
try:
# Get vector search service for documents
document_search = self.vector_search_factory.create_search_service(
entity_type="document",
table_name="documents"
)
# Perform a search
results = await document_search.search(query)
return Success(results)
except Exception as e:
return Failure(e)
# Use in an API endpoint
@router.get("/search")
@inject_params()
async def search_documents(
query_text: str,
limit: int = 10,
threshold: float = 0.7,
query_bus: QueryBus = None
):
query = DocumentSearchQuery(
query_text=query_text,
limit=limit,
threshold=threshold
)
result = await query_bus.dispatch(query)
if result.is_success:
return [
{
"id": item.id,
"similarity": item.similarity,
"title": item.entity.title,
"content": item.entity.content[:100] + "..." if len(item.entity.content) > 100 else item.entity.content
}
for item in result.value
]
else:
return {"error": str(result.error)}, 400# Import the security components
from uno.security import SecurityManager
from uno.security.config import SecurityConfig
from uno.security.audit import SecurityEvent
from uno.security.encryption import FieldEncryption
# Create security configuration
config = SecurityConfig(
encryption={
"algorithm": "AES_GCM",
"key_management": "LOCAL", # For development
"field_level_encryption": True,
"encrypted_fields": ["password", "credit_card", "ssn"]
},
authentication={
"enable_mfa": True,
"mfa_type": "TOTP",
"session_timeout_minutes": 60,
"password_policy": {
"level": "STRICT",
"min_length": 16
}
},
audit={
"enabled": True,
"storage": {
"type": "database",
"connection": "postgresql://user:pass@localhost/db",
},
"retention_days": 365
}
)
# Create security manager
security = SecurityManager(config)
# Field-level encryption in model
class User(AggregateEntity[str]):
def __init__(self, id: str, email: str, ssn: str):
super().__init__(id=id)
self.email = email
self._ssn = security.encrypt_field("ssn", ssn) # Encrypted field
@property
def ssn(self) -> str:
# Decrypt when accessed
return security.decrypt_field("ssn", self._ssn)
# MFA implementation in API
@router.post("/mfa/setup")
@inject_params()
async def setup_mfa(
user_id: str,
security_manager: SecurityManager
):
# Set up MFA for user
setup_info = await security_manager.setup_mfa(user_id)
# Return setup information including QR code
return {
"secret": setup_info.secret,
"qr_code": setup_info.qr_code_uri,
"instructions": setup_info.instructions
}
@router.post("/mfa/verify")
@inject_params()
async def verify_mfa(
user_id: str,
code: str,
security_manager: SecurityManager
):
# Verify MFA code
is_valid = await security_manager.verify_mfa(user_id, code)
if is_valid:
# Log successful verification
security_manager.audit.log(
event_type=SecurityEvent.MFA_VERIFICATION,
user_id=user_id,
metadata={"success": True}
)
return {"verified": True}
else:
# Log failed verification
security_manager.audit.log(
event_type=SecurityEvent.MFA_VERIFICATION,
user_id=user_id,
metadata={"success": False}
)
return {"verified": False}
# Security testing in CI/CD pipeline
@task(name="security-scan")
async def run_security_scan():
from uno.security.testing import SecurityScanner
# Create scanner
scanner = SecurityScanner()
# Run dependency scan
vulnerabilities = await scanner.scan_dependencies()
# Run static analysis
security_issues = await scanner.run_static_analysis()
# Generate report
report = scanner.generate_report(
vulnerabilities=vulnerabilities,
security_issues=security_issues
)
# Check if any critical issues
if report.has_critical_issues():
print("Critical security issues found!")
print(report.critical_issues_summary())
return False
return TrueWe follow a Docker-first approach for all environments. The easiest way to get started is:
# Set up Docker and run the application
hatch run dev:app
# Or just set up the Docker environment
hatch run dev:docker-setupThis will create a PostgreSQL 16 container with all required extensions, including pgvector for vector search and AGE for graph database capabilities.
See DOCKER_FIRST.md for more details on our Docker-first approach.
uno is built on a modular architecture with several core components:
-
Domain Layer: Implements domain-driven design (DDD) principles
AggregateEntity: Base class for aggregate rootsBaseDomainEvent: Base class for domain eventsValueObject: Base class for immutable value objectsRepository: Protocol for data access abstraction
-
Application Layer: Implements application services and CQRS
CommandBus: Dispatches commands to their handlersQueryBus: Dispatches queries to their handlersUnitOfWork: Manages transaction boundaries
-
Async Layer: Implements robust async patterns
TaskManager: Manages async tasks with proper cancellationAsyncLock/Semaphore/Event: Enhanced concurrency primitivesTaskGroup: Structured concurrency for related tasksAsyncBatcher: Batches async operations for efficiencyAsyncCache: Provides async-aware caching with TTL
-
Resource Management Layer: Manages application resources
ResourceRegistry: Centralized registry for tracked resourcesConnectionPool: Pooled connections with health checkingCircuitBreaker: Circuit breaker for external service reliabilityResourceMonitor: Monitoring and health checking for resourcesResourceManager: Application lifecycle integration
-
Data Layer: Manages database connections, schema definition, and data operations
UnoModel: SQLAlchemy-based model for defining database tablesDatabaseFactory: Centralized factory for creating database connectionsSQL Emitters: Components that generate SQL for various database objectsEnhancedAsyncSession: Improved async session with robust error handlingPooledAsyncSession: Connection pooling with circuit breakers
-
API Layer: Exposes functionality through REST endpoints
DIAPIRouter: FastAPI-based router with dependency injectionEndpointFactory: Automatically generates endpoints from objectsFilter Manager: Handles query parameters and filtering
-
Infrastructure Layer: Provides cross-cutting concerns
Dependency Injection: Protocol-based DI with proper scopingEvent Bus: Publishes and subscribes to domain eventsConfiguration: Environment-aware configuration systemAsyncManager: Coordinates async resources throughout the application
src/uno/
├── __init__.py
├── core/ # Core domain-driven design components
│ ├── protocols.py # Interface protocols
│ ├── domain.py # DDD building blocks
│ ├── events.py # Event-driven architecture
│ ├── cqrs.py # CQRS pattern
│ ├── uow.py # Unit of Work pattern
│ ├── result.py # Result pattern for error handling
│ ├── async/ # Async utilities
│ │ ├── task_manager.py # Task management
│ │ ├── concurrency.py # Enhanced concurrency primitives
│ │ └── context.py # Context management
│ ├── async_integration.py # Async integration utilities
│ ├── async_manager.py # Central async resource manager
│ ├── resources.py # Resource management components
│ ├── resource_monitor.py # Resource monitoring
│ ├── resource_management.py # Resource lifecycle management
│ ├── fastapi_integration.py # FastAPI integration for resources
│ └── config.py # Configuration management
├── api/ # API components
│ ├── endpoint.py # Base endpoint definition
│ └── endpoint_factory.py # Factory for creating API endpoints
├── dependencies/ # Dependency injection
│ ├── scoped_container.py # DI container with scoping
│ ├── decorators.py # DI decorators
│ ├── fastapi_integration.py # FastAPI integration
│ └── discovery.py # Automatic service discovery
├── database/ # Database components
│ ├── config.py # Connection configuration
│ ├── db.py # Database operations
│ ├── enhanced_db.py # Enhanced database operations
│ ├── enhanced_session.py # Enhanced session management
│ ├── pooled_session.py # Pooled session management
│ └── engine/ # Database engine management
│ ├── async.py # Async engine
│ ├── enhanced_async.py # Enhanced async engine
│ ├── pooled_async.py # Pooled async engine
│ ├── base.py # Base engine factory
│ └── sync.py # Synchronous engine
├── model.py # SQL Alchemy model base
├── queries/ # Query components
│ ├── filter.py # Filter definitions
│ ├── filter_manager.py # Query filtering
│ ├── optimized_queries.py # Optimized query building and execution
│ ├── common_patterns.py # Common query patterns with optimizations
│ ├── batch_operations.py # High-performance batch operations
│ └── executor.py # Query execution engine
├── schema/ # Schema components
│ ├── schema.py # Schema definitions
│ └── schema_manager.py # Schema management
├── sql/ # SQL generation
│ ├── emitter.py # Base SQL emitter
│ └── emitters/ # Specialized emitters
├── domain/ # Domain-specific components
│ ├── service_example.py # Example domain service
│ └── api_example.py # Example API endpoints
├── security/ # Security framework
│ ├── config.py # Security configuration
│ ├── manager.py # Security manager
│ ├── encryption/ # Encryption components
│ │ ├── aes.py # AES encryption
│ │ ├── rsa.py # RSA encryption
│ │ └── field.py # Field-level encryption
│ ├── auth/ # Authentication components
│ │ ├── totp.py # TOTP-based MFA
│ │ ├── password.py # Password management
│ │ └── sso.py # Single sign-on
│ ├── audit/ # Audit logging
│ │ ├── event.py # Security event definitions
│ │ └── logger.py # Audit logger
│ └── testing/ # Security testing tools
│ ├── scanner.py # Vulnerability scanner
│ ├── dependency.py # Dependency security
│ └── static.py # Static analysis
└── vector_search/ # Vector search components
├── services.py # Vector search services
├── events.py # Event handlers for vector updates
└── models.py # Vector query and result models
- Python 3.12+
- Docker and Docker Compose
- All PostgreSQL dependencies are handled by Docker
- No local PostgreSQL installation needed
Comprehensive documentation is available in the /docs directory and can be built using MkDocs:
# Build documentation
python src/scripts/generate_docs.py --mkdocs
# Serve documentation locally
python src/scripts/generate_docs.py --mkdocs --serveFor more information on the documentation, see:
# Setup Docker for testing and run tests
hatch run test:all
# Just set up the Docker test environment
hatch run test:docker-setup
# Run tests (after Docker setup)
hatch run test:test
# Run tests with more details
hatch run test:testvv
# Run integration tests
hatch run test:integration
# Run integration tests with vector search components
hatch run test:integration-vector
# Run performance benchmarks for integration tests
hatch run test:benchmarks
# Type checking
hatch run types:checkThe test suite includes comprehensive integration tests that verify component interactions in a real-world environment. These tests cover:
- Core infrastructure components (migrations, connection pooling, transactions)
- Authentication and authorization (JWT, RBAC, permissions)
- Data processing features (vector search, batch operations, query optimization)
- Distributed systems features (caching, error handling)
Each test verifies that components work together correctly with real infrastructure (PostgreSQL, Redis) through Docker.
# Run all benchmarks
hatch run test:benchmark
# Run specific module benchmarks
hatch run test:benchmark tests/benchmarks/test_database_performance.py
# Run integration test benchmarks
cd tests/integration
./run_benchmarks.py --output benchmark_results.json --csv
# Compare benchmark results with previous run
./run_benchmarks.py --compare previous_results.json
# View dashboard for all benchmark results
cd benchmarks/dashboard
./run_dashboard.shThe benchmark infrastructure includes:
- Comprehensive Benchmarks: Performance tests for all critical components
- Integration Test Benchmarks: Performance tests for component interactions
- Benchmark Runner: Tools for running benchmarks and comparing results
- Dashboard: Visualization and analysis of benchmark results
The dashboard provides performance comparison, trend analysis, and scaling visualization. It integrates results from both unit-level and integration-level benchmarks. See docs/benchmarks/dashboard.md for more details.
# Build documentation
hatch run docs:build
# Serve documentation locally
hatch run docs:serveuno is distributed under the terms of the MIT license.