Skip to content

Commit ad68624

Browse files
BukeLyclaude
andcommitted
feat: PostgreSQL model isolation and auto-migration
Why this change is needed: PostgreSQL vector storage needs model isolation to prevent dimension conflicts when different workspaces use different embedding models. Without this, the first workspace locks the vector dimension for all subsequent workspaces, causing failures. How it solves it: - Implements dynamic table naming with model suffix: {table}_{model}_{dim}d - Adds setup_table() method mirroring Qdrant's approach for consistency - Implements 4-branch migration logic: both exist -> warn, only new -> use, neither -> create, only legacy -> migrate - Batch migration: 500 records/batch (same as Qdrant) - No automatic rollback to support idempotent re-runs Impact: - PostgreSQL tables now isolated by embedding model and dimension - Automatic data migration from legacy tables on startup - Backward compatible: model_name=None defaults to "unknown" - All SQL operations use dynamic table names Testing: - 6 new tests for PostgreSQL migration (100% pass) - Tests cover: naming, migration trigger, scenarios 1-3 - 3 additional scenario tests added for Qdrant completeness Co-Authored-By: Claude <noreply@anthropic.com>
1 parent df5aacb commit ad68624

File tree

4 files changed

+798
-23
lines changed

4 files changed

+798
-23
lines changed

lightrag/kg/postgres_impl.py

Lines changed: 223 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2175,6 +2175,38 @@ async def drop(self) -> dict[str, str]:
21752175
return {"status": "error", "message": str(e)}
21762176

21772177

2178+
async def _pg_table_exists(db: PostgreSQLDB, table_name: str) -> bool:
2179+
"""Check if a table exists in PostgreSQL database"""
2180+
query = """
2181+
SELECT EXISTS (
2182+
SELECT FROM information_schema.tables
2183+
WHERE table_name = $1
2184+
)
2185+
"""
2186+
result = await db.query(query, [table_name.lower()])
2187+
return result.get("exists", False) if result else False
2188+
2189+
2190+
async def _pg_create_table(
2191+
db: PostgreSQLDB, table_name: str, base_table: str, embedding_dim: int
2192+
) -> None:
2193+
"""Create a new vector table by replacing the table name in DDL template"""
2194+
if base_table not in TABLES:
2195+
raise ValueError(f"No DDL template found for table: {base_table}")
2196+
2197+
ddl_template = TABLES[base_table]["ddl"]
2198+
2199+
# Replace embedding dimension placeholder if exists
2200+
ddl = ddl_template.replace(
2201+
f"VECTOR({os.environ.get('EMBEDDING_DIM', 1024)})", f"VECTOR({embedding_dim})"
2202+
)
2203+
2204+
# Replace table name
2205+
ddl = ddl.replace(base_table, table_name)
2206+
2207+
await db.execute(ddl)
2208+
2209+
21782210
@final
21792211
@dataclass
21802212
class PGVectorStorage(BaseVectorStorage):
@@ -2190,6 +2222,163 @@ def __post_init__(self):
21902222
)
21912223
self.cosine_better_than_threshold = cosine_threshold
21922224

2225+
# Generate model suffix for table isolation
2226+
self.model_suffix = self._generate_collection_suffix()
2227+
2228+
# Get base table name
2229+
base_table = namespace_to_table_name(self.namespace)
2230+
if not base_table:
2231+
raise ValueError(f"Unknown namespace: {self.namespace}")
2232+
2233+
# New table name (with suffix)
2234+
self.table_name = f"{base_table}_{self.model_suffix}"
2235+
2236+
# Legacy table name (without suffix, for migration)
2237+
self.legacy_table_name = base_table
2238+
2239+
logger.debug(
2240+
f"PostgreSQL table naming: "
2241+
f"new='{self.table_name}', "
2242+
f"legacy='{self.legacy_table_name}', "
2243+
f"model_suffix='{self.model_suffix}'"
2244+
)
2245+
2246+
@staticmethod
2247+
async def setup_table(
2248+
db: PostgreSQLDB,
2249+
table_name: str,
2250+
legacy_table_name: str = None,
2251+
base_table: str = None,
2252+
embedding_dim: int = None,
2253+
):
2254+
"""
2255+
Setup PostgreSQL table with migration support from legacy tables.
2256+
2257+
This method mirrors Qdrant's setup_collection approach to maintain consistency.
2258+
2259+
Args:
2260+
db: PostgreSQLDB instance
2261+
table_name: Name of the new table
2262+
legacy_table_name: Name of the legacy table (if exists)
2263+
base_table: Base table name for DDL template lookup
2264+
embedding_dim: Embedding dimension for vector column
2265+
"""
2266+
new_table_exists = await _pg_table_exists(db, table_name)
2267+
legacy_exists = legacy_table_name and await _pg_table_exists(
2268+
db, legacy_table_name
2269+
)
2270+
2271+
# Case 1: Both new and legacy tables exist - Warning only (no migration)
2272+
if new_table_exists and legacy_exists:
2273+
logger.warning(
2274+
f"PostgreSQL: Legacy table '{legacy_table_name}' still exists. "
2275+
f"Remove it if migration is complete."
2276+
)
2277+
return
2278+
2279+
# Case 2: Only new table exists - Already migrated or newly created
2280+
if new_table_exists:
2281+
logger.debug(f"PostgreSQL: Table '{table_name}' already exists")
2282+
return
2283+
2284+
# Case 3: Neither exists - Create new table
2285+
if not legacy_exists:
2286+
logger.info(f"PostgreSQL: Creating new table '{table_name}'")
2287+
await _pg_create_table(db, table_name, base_table, embedding_dim)
2288+
logger.info(f"PostgreSQL: Table '{table_name}' created successfully")
2289+
return
2290+
2291+
# Case 4: Only legacy exists - Migrate data
2292+
logger.info(
2293+
f"PostgreSQL: Migrating data from legacy table '{legacy_table_name}'"
2294+
)
2295+
2296+
try:
2297+
# Get legacy table count
2298+
count_query = f"SELECT COUNT(*) as count FROM {legacy_table_name}"
2299+
count_result = await db.query(count_query, [])
2300+
legacy_count = count_result.get("count", 0) if count_result else 0
2301+
logger.info(f"PostgreSQL: Found {legacy_count} records in legacy table")
2302+
2303+
if legacy_count == 0:
2304+
logger.info("PostgreSQL: Legacy table is empty, skipping migration")
2305+
await _pg_create_table(db, table_name, base_table, embedding_dim)
2306+
return
2307+
2308+
# Create new table first
2309+
logger.info(f"PostgreSQL: Creating new table '{table_name}'")
2310+
await _pg_create_table(db, table_name, base_table, embedding_dim)
2311+
2312+
# Batch migration (500 records per batch, same as Qdrant)
2313+
migrated_count = 0
2314+
offset = 0
2315+
batch_size = 500 # Mirror Qdrant batch size
2316+
2317+
while True:
2318+
# Fetch a batch of rows
2319+
select_query = (
2320+
f"SELECT * FROM {legacy_table_name} OFFSET $1 LIMIT $2"
2321+
)
2322+
rows = await db.fetch(select_query, [offset, batch_size])
2323+
2324+
if not rows:
2325+
break
2326+
2327+
# Insert batch into new table
2328+
for row in rows:
2329+
# Get column names and values
2330+
columns = list(row.keys())
2331+
values = list(row.values())
2332+
2333+
# Build insert query
2334+
placeholders = ", ".join([f"${i+1}" for i in range(len(columns))])
2335+
columns_str = ", ".join(columns)
2336+
insert_query = f"""
2337+
INSERT INTO {table_name} ({columns_str})
2338+
VALUES ({placeholders})
2339+
ON CONFLICT DO NOTHING
2340+
"""
2341+
2342+
await db.execute(insert_query, values)
2343+
2344+
migrated_count += len(rows)
2345+
logger.info(
2346+
f"PostgreSQL: {migrated_count}/{legacy_count} records migrated"
2347+
)
2348+
2349+
offset += batch_size
2350+
2351+
# Verify migration by comparing counts
2352+
logger.info("Verifying migration...")
2353+
new_count_query = f"SELECT COUNT(*) as count FROM {table_name}"
2354+
new_count_result = await db.query(new_count_query, [])
2355+
new_count = new_count_result.get("count", 0) if new_count_result else 0
2356+
2357+
if new_count != legacy_count:
2358+
error_msg = (
2359+
f"PostgreSQL: Migration verification failed, "
2360+
f"expected {legacy_count} records, got {new_count} in new table"
2361+
)
2362+
logger.error(error_msg)
2363+
raise PostgreSQLMigrationError(error_msg)
2364+
2365+
logger.info(
2366+
f"PostgreSQL: Migration completed successfully: {migrated_count} records migrated"
2367+
)
2368+
logger.info(
2369+
f"PostgreSQL: Migration from '{legacy_table_name}' to '{table_name}' completed successfully"
2370+
)
2371+
2372+
except PostgreSQLMigrationError:
2373+
# Re-raise migration errors without wrapping
2374+
raise
2375+
except Exception as e:
2376+
error_msg = f"PostgreSQL: Migration failed with error: {e}"
2377+
logger.error(error_msg)
2378+
# Mirror Qdrant behavior: no automatic rollback
2379+
# Reason: partial data can be continued by re-running migration
2380+
raise PostgreSQLMigrationError(error_msg) from e
2381+
21932382
async def initialize(self):
21942383
async with get_data_init_lock():
21952384
if self.db is None:
@@ -2206,6 +2395,15 @@ async def initialize(self):
22062395
# Use "default" for compatibility (lowest priority)
22072396
self.workspace = "default"
22082397

2398+
# Setup table (create if not exists and handle migration)
2399+
await PGVectorStorage.setup_table(
2400+
self.db,
2401+
self.table_name,
2402+
legacy_table_name=self.legacy_table_name,
2403+
base_table=self.legacy_table_name, # base_table for DDL template lookup
2404+
embedding_dim=self.embedding_func.embedding_dim,
2405+
)
2406+
22092407
async def finalize(self):
22102408
if self.db is not None:
22112409
await ClientManager.release_client(self.db)
@@ -2215,7 +2413,9 @@ def _upsert_chunks(
22152413
self, item: dict[str, Any], current_time: datetime.datetime
22162414
) -> tuple[str, dict[str, Any]]:
22172415
try:
2218-
upsert_sql = SQL_TEMPLATES["upsert_chunk"]
2416+
upsert_sql = SQL_TEMPLATES["upsert_chunk"].format(
2417+
table_name=self.table_name
2418+
)
22192419
data: dict[str, Any] = {
22202420
"workspace": self.workspace,
22212421
"id": item["__id__"],
@@ -2239,7 +2439,7 @@ def _upsert_chunks(
22392439
def _upsert_entities(
22402440
self, item: dict[str, Any], current_time: datetime.datetime
22412441
) -> tuple[str, dict[str, Any]]:
2242-
upsert_sql = SQL_TEMPLATES["upsert_entity"]
2442+
upsert_sql = SQL_TEMPLATES["upsert_entity"].format(table_name=self.table_name)
22432443
source_id = item["source_id"]
22442444
if isinstance(source_id, str) and "<SEP>" in source_id:
22452445
chunk_ids = source_id.split("<SEP>")
@@ -2262,7 +2462,9 @@ def _upsert_entities(
22622462
def _upsert_relationships(
22632463
self, item: dict[str, Any], current_time: datetime.datetime
22642464
) -> tuple[str, dict[str, Any]]:
2265-
upsert_sql = SQL_TEMPLATES["upsert_relationship"]
2465+
upsert_sql = SQL_TEMPLATES["upsert_relationship"].format(
2466+
table_name=self.table_name
2467+
)
22662468
source_id = item["source_id"]
22672469
if isinstance(source_id, str) and "<SEP>" in source_id:
22682470
chunk_ids = source_id.split("<SEP>")
@@ -2335,7 +2537,9 @@ async def query(
23352537

23362538
embedding_string = ",".join(map(str, embedding))
23372539

2338-
sql = SQL_TEMPLATES[self.namespace].format(embedding_string=embedding_string)
2540+
sql = SQL_TEMPLATES[self.namespace].format(
2541+
embedding_string=embedding_string, table_name=self.table_name
2542+
)
23392543
params = {
23402544
"workspace": self.workspace,
23412545
"closer_than_threshold": 1 - self.cosine_better_than_threshold,
@@ -2357,14 +2561,7 @@ async def delete(self, ids: list[str]) -> None:
23572561
if not ids:
23582562
return
23592563

2360-
table_name = namespace_to_table_name(self.namespace)
2361-
if not table_name:
2362-
logger.error(
2363-
f"[{self.workspace}] Unknown namespace for vector deletion: {self.namespace}"
2364-
)
2365-
return
2366-
2367-
delete_sql = f"DELETE FROM {table_name} WHERE workspace=$1 AND id = ANY($2)"
2564+
delete_sql = f"DELETE FROM {self.table_name} WHERE workspace=$1 AND id = ANY($2)"
23682565

23692566
try:
23702567
await self.db.execute(delete_sql, {"workspace": self.workspace, "ids": ids})
@@ -2383,8 +2580,8 @@ async def delete_entity(self, entity_name: str) -> None:
23832580
entity_name: The name of the entity to delete
23842581
"""
23852582
try:
2386-
# Construct SQL to delete the entity
2387-
delete_sql = """DELETE FROM LIGHTRAG_VDB_ENTITY
2583+
# Construct SQL to delete the entity using dynamic table name
2584+
delete_sql = f"""DELETE FROM {self.table_name}
23882585
WHERE workspace=$1 AND entity_name=$2"""
23892586

23902587
await self.db.execute(
@@ -2404,7 +2601,7 @@ async def delete_entity_relation(self, entity_name: str) -> None:
24042601
"""
24052602
try:
24062603
# Delete relations where the entity is either the source or target
2407-
delete_sql = """DELETE FROM LIGHTRAG_VDB_RELATION
2604+
delete_sql = f"""DELETE FROM {self.table_name}
24082605
WHERE workspace=$1 AND (source_id=$2 OR target_id=$2)"""
24092606

24102607
await self.db.execute(
@@ -3188,6 +3385,11 @@ async def drop(self) -> dict[str, str]:
31883385
return {"status": "error", "message": str(e)}
31893386

31903387

3388+
class PostgreSQLMigrationError(Exception):
3389+
"""Exception for PostgreSQL table migration errors."""
3390+
pass
3391+
3392+
31913393
class PGGraphQueryException(Exception):
31923394
"""Exception for the AGE queries."""
31933395

@@ -5047,7 +5249,7 @@ def namespace_to_table_name(namespace: str) -> str:
50475249
update_time = EXCLUDED.update_time
50485250
""",
50495251
# SQL for VectorStorage
5050-
"upsert_chunk": """INSERT INTO LIGHTRAG_VDB_CHUNKS (workspace, id, tokens,
5252+
"upsert_chunk": """INSERT INTO {table_name} (workspace, id, tokens,
50515253
chunk_order_index, full_doc_id, content, content_vector, file_path,
50525254
create_time, update_time)
50535255
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
@@ -5060,7 +5262,7 @@ def namespace_to_table_name(namespace: str) -> str:
50605262
file_path=EXCLUDED.file_path,
50615263
update_time = EXCLUDED.update_time
50625264
""",
5063-
"upsert_entity": """INSERT INTO LIGHTRAG_VDB_ENTITY (workspace, id, entity_name, content,
5265+
"upsert_entity": """INSERT INTO {table_name} (workspace, id, entity_name, content,
50645266
content_vector, chunk_ids, file_path, create_time, update_time)
50655267
VALUES ($1, $2, $3, $4, $5, $6::varchar[], $7, $8, $9)
50665268
ON CONFLICT (workspace,id) DO UPDATE
@@ -5071,7 +5273,7 @@ def namespace_to_table_name(namespace: str) -> str:
50715273
file_path=EXCLUDED.file_path,
50725274
update_time=EXCLUDED.update_time
50735275
""",
5074-
"upsert_relationship": """INSERT INTO LIGHTRAG_VDB_RELATION (workspace, id, source_id,
5276+
"upsert_relationship": """INSERT INTO {table_name} (workspace, id, source_id,
50755277
target_id, content, content_vector, chunk_ids, file_path, create_time, update_time)
50765278
VALUES ($1, $2, $3, $4, $5, $6, $7::varchar[], $8, $9, $10)
50775279
ON CONFLICT (workspace,id) DO UPDATE
@@ -5087,7 +5289,7 @@ def namespace_to_table_name(namespace: str) -> str:
50875289
SELECT r.source_id AS src_id,
50885290
r.target_id AS tgt_id,
50895291
EXTRACT(EPOCH FROM r.create_time)::BIGINT AS created_at
5090-
FROM LIGHTRAG_VDB_RELATION r
5292+
FROM {table_name} r
50915293
WHERE r.workspace = $1
50925294
AND r.content_vector <=> '[{embedding_string}]'::vector < $2
50935295
ORDER BY r.content_vector <=> '[{embedding_string}]'::vector
@@ -5096,7 +5298,7 @@ def namespace_to_table_name(namespace: str) -> str:
50965298
"entities": """
50975299
SELECT e.entity_name,
50985300
EXTRACT(EPOCH FROM e.create_time)::BIGINT AS created_at
5099-
FROM LIGHTRAG_VDB_ENTITY e
5301+
FROM {table_name} e
51005302
WHERE e.workspace = $1
51015303
AND e.content_vector <=> '[{embedding_string}]'::vector < $2
51025304
ORDER BY e.content_vector <=> '[{embedding_string}]'::vector
@@ -5107,7 +5309,7 @@ def namespace_to_table_name(namespace: str) -> str:
51075309
c.content,
51085310
c.file_path,
51095311
EXTRACT(EPOCH FROM c.create_time)::BIGINT AS created_at
5110-
FROM LIGHTRAG_VDB_CHUNKS c
5312+
FROM {table_name} c
51115313
WHERE c.workspace = $1
51125314
AND c.content_vector <=> '[{embedding_string}]'::vector < $2
51135315
ORDER BY c.content_vector <=> '[{embedding_string}]'::vector

0 commit comments

Comments
 (0)