A high-performance Python library for PostgreSQL UPSERT operations with intelligent conflict resolution using PostgreSQL temporary tables and atomic MERGE statements. Designed for reliability, data integrity, and modern Python development with comprehensive type safety.
- Temporary Table Staging: Uses PostgreSQL temporary tables for efficient, isolated staging and conflict analysis
- Atomic MERGE Operations: Single-transaction upserts using PostgreSQL 15+ MERGE statements for reliability and performance
- Multi-constraint Support: Handles primary keys, unique constraints, and composite constraints simultaneously
- Intelligent Conflict Resolution: Automatically filters ambiguous conflicts and deduplicates data
- Automatic NaN to NULL Conversion: Seamlessly converts pandas NaN/None values to PostgreSQL NULL values
- Schema Validation: Automatic table and column validation before operations
- Comprehensive Logging: Detailed debug information and progress tracking
- Modern Typing: Fully typed with Python 3.10+ type hints and strict mypy compliance
- Type Safety: 100% type coverage with comprehensive type annotations
poetry add sqlalchemy-psql-upsertpip install sqlalchemy-psql-upsertBesides SELECT, INSERT, UPDATE permissions on target tables, this library requires PostgreSQL TEMPORARY privilege to function properly:
Why Temporary Tables?
- Isolation: Staging data doesn't interfere with production tables during analysis
- Performance: Bulk operations are faster on temporary tables
- Safety: Failed operations don't leave partial data in target tables
- Atomicity: Entire upsert operation happens in a single transaction
Create a .env file or set the following environment variables:
# PostgreSQL Configuration
PGHOST = localhost
PGPORT = 5432
PGDATABASE = your_database
PGUSER = your_username
PGPASSWORD = your_passwordfrom sqlalchemy_psql_upsert import PgConfig
# Default configuration from environment
config = PgConfig()
# Manual configuration
config = PgConfig(
host="localhost",
port="5432",
user="myuser",
password="mypass",
dbname="mydb"
)
print(config.uri()) # postgresql+psycopg2://myuser:mypass@localhost:5432/mydbfrom sqlalchemy_psql_upsert import test_connection
# Test default connection
success, message = test_connection()
if success:
print("β
Database connection OK")
else:
print(f"β Connection failed: {message}")Important: This library requires CREATE TEMP TABLE privileges to function properly. The client automatically verifies these privileges during initialization.
from sqlalchemy_psql_upsert import PostgresqlUpsert, PgConfig
from sqlalchemy import create_engine
# Test connection and privileges
config = PgConfig()
try:
# This will automatically test temp table privileges
upserter = PostgresqlUpsert(config=config)
print("β
Connection and privileges verified successfully")
except PermissionError as e:
print(f"β Privilege error: {e}")
print("Solution: Grant temporary privileges with:")
print("GRANT TEMPORARY ON DATABASE your_database TO your_user;")
except Exception as e:
print(f"β Connection failed: {e}")Grant Required Privileges:
-- As database administrator, grant temporary table privileges
GRANT TEMPORARY ON DATABASE your_database TO your_user;
-- Alternatively, grant more comprehensive privileges
GRANT CREATE ON DATABASE your_database TO your_user;import pandas as pd
from sqlalchemy_psql_upsert import PostgresqlUpsert, PgConfig
# Configure database connection
config = PgConfig() # Loads from environment variables
upserter = PostgresqlUpsert(config=config)
# Prepare your data
df = pd.DataFrame({
'id': [1, 2, 3],
'name': ['Alice', 'Bob', 'Charlie'],
'email': ['alice@example.com', 'bob@example.com', 'charlie@example.com']
})
# Perform upsert
affected_rows = upserter.upsert_dataframe(
dataframe=df,
table_name='users',
schema='public'
)
print(f"β
Upserted {affected_rows} rows successfully")
# Get detailed information about skipped rows
affected_rows, skipped_df = upserter.upsert_dataframe(
dataframe=df,
table_name='users',
schema='public',
return_skipped=True
)
print(f"β
Upserted {affected_rows} rows, {len(skipped_df)} rows skipped")
if not skipped_df.empty:
print("Skipped rows:", skipped_df[['skip_reason']].value_counts())from sqlalchemy import create_engine
# Using custom SQLAlchemy engine
engine = create_engine('postgresql://user:pass@localhost:5432/mydb')
upserter = PostgresqlUpsert(engine=engine, debug=True)
# Upsert with custom schema
affected_rows = upserter.upsert_dataframe(
dataframe=large_df,
table_name='products',
schema='inventory'
)The library automatically handles pandas data type conversions:
import pandas as pd
import numpy as np
# DataFrame with NaN values
df = pd.DataFrame({
'id': [1, 2, 3, 4],
'name': ['Alice', 'Bob', None, 'David'], # None values
'score': [85.5, np.nan, 92.0, 88.1], # NaN values
'active': [True, False, None, True] # Mixed types with None
})
# All NaN and None values are automatically converted to PostgreSQL NULL
upserter.upsert_dataframe(df, 'users')
# Result: NaN/None β NULL in PostgreSQLThis library uses a robust multi-step approach for reliable, high-performance upserts:
-
Temporary Table Staging
- A temporary table is created with the same structure as the target table, plus a
skip_reasoncolumn. - All input DataFrame rows are bulk inserted into this table.
- A temporary table is created with the same structure as the target table, plus a
-
Multi-Row Conflict Detection
- Each row in the temp table is checked for conflicts against the target table's constraints (PK, unique, composite unique).
- Rows that would conflict with more than one target row are marked with
skip_reason = 'multiple_target_conflicts'.
-
Ranking and Deduplication
- For rows that only conflict with a single target row, the library checks for duplicate source rows (multiple input rows targeting the same record).
- Only the last row for each target is kept; others are marked with
skip_reason = 'duplicate_source_rows'.
-
Constructing the Clean Rows Table
- A second temporary table (the "clean table") is created with the same structure as the target table.
- All rows from the raw temp table where
skip_reason IS NULLare copied into the clean table.
-
Merging Clean Table into Target Table
- A single, atomic PostgreSQL
MERGEstatement is executed:- If a row in the clean table matches a row in the target table (by any constraint), it is updated.
- If there is no match, the row is inserted.
- A single, atomic PostgreSQL
-
Fetching and Returning Conflicts
- After the merge, all rows from the raw temp table where
skip_reason IS NOT NULLare fetched and returned to the user, along with the reason they were skipped.
- After the merge, all rows from the raw temp table where
This process ensures only valid, deduplicated rows are upserted, and all conflicts are tracked and reported for further review.
The library automatically analyzes your target table to identify:
- Primary key constraints: Single or composite primary keys
- Unique constraints: Single column unique constraints
- Composite unique constraints: Multi-column unique constraints
All constraint types are handled simultaneously in a single operation.
Performance Benefits:
- Single Transaction: Entire operation is atomic, no partial updates or race conditions
- Bulk Operations: High-performance bulk inserts into temporary tables
- Efficient Joins: PostgreSQL optimizes joins between temporary and main tables
- Minimal Locking: Temporary tables don't interfere with concurrent operations
Reliability Benefits:
- Comprehensive Conflict Resolution: Handles all constraint types simultaneously
- Deterministic Results: Same input always produces same output
- Automatic Cleanup: Temporary tables are automatically dropped
- ACID Compliance: Full transaction safety and rollback capability
Data Integrity Benefits:
- Ambiguity Detection: Automatically detects and skips problematic rows
- Deduplication: Handles duplicate input data intelligently
- Constraint Validation: PostgreSQL validates all constraints during MERGE
Resource Requirements:
- Memory Usage: All input data is staged in temporary tables (memory-resident)
- Temporary Space: Requires sufficient temporary storage for staging tables
- Single-threaded: No parallel processing (traded for reliability and simplicity)
PostgreSQL Specifics:
- Version Dependency: MERGE statement requires PostgreSQL 15+
- Session-based Temp Tables: Temporary tables are tied to database sessions
Privilege-related Limitations:
- Database Administrator: May need DBA assistance to grant
TEMPORARYprivileges - Shared Hosting: Some cloud providers restrict temporary table creation
- Security Policies: Corporate environments may restrict temporary table usage
Scale Considerations:
- Large Dataset Handling: Very large datasets (>1M rows) may require memory tuning
- Transaction Duration: Entire operation happens in one transaction (longer lock times)
Memory Management:
# For large datasets, monitor memory usage
import pandas as pd
# Consider chunking very large datasets manually if needed
def chunk_dataframe(df, chunk_size=50000):
for start in range(0, len(df), chunk_size):
yield df[start:start + chunk_size]
# Process in manageable chunks
for chunk in chunk_dataframe(large_df):
affected_rows = upserter.upsert_dataframe(chunk, 'target_table')
print(f"Processed chunk: {affected_rows} rows")Performance Optimization:
# Ensure proper indexing on conflict columns
# CREATE INDEX idx_email ON target_table(email);
# CREATE INDEX idx_composite ON target_table(doc_type, doc_number);
# Use debug mode to monitor performance
upserter = PostgresqlUpsert(engine=engine, debug=True)Error Handling:
try:
affected_rows = upserter.upsert_dataframe(df, 'users')
logger.info(f"Successfully upserted {affected_rows} rows")
except ValueError as e:
logger.error(f"Validation error: {e}")
except Exception as e:
logger.error(f"Upsert failed: {e}")
# Handle rollback - transaction is automatically rolled backWe welcome contributions! Here's how to get started:
- Fork the repository
- Create a feature branch:
git checkout -b feature/amazing-feature - Make your changes and add tests
- Run the test suite:
pytest tests/ -v - Submit a pull request
This project is licensed under the GPL v3 License - see the LICENSE file for details.
- Issues: GitHub Issues
- Documentation: Check the docstrings and test files for detailed usage examples