Skip to content

Commit 1b309b5

Browse files
fix(idempotency): add conflict target to atomicallyClaimDb query + remove redundant db namespace tracking (#2950)
* fix(idempotency): add conflict target to atomicallyClaimDb query * delete needs to account for namespace * simplify namespace filtering logic * fix cleanup * consistent target
1 parent f765b83 commit 1b309b5

File tree

6 files changed

+10396
-60
lines changed

6 files changed

+10396
-60
lines changed

apps/sim/lib/core/idempotency/cleanup.ts

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { db } from '@sim/db'
22
import { idempotencyKey } from '@sim/db/schema'
33
import { createLogger } from '@sim/logger'
4-
import { and, eq, lt } from 'drizzle-orm'
4+
import { and, count, inArray, like, lt, max, min, sql } from 'drizzle-orm'
55

66
const logger = createLogger('IdempotencyCleanup')
77

@@ -19,7 +19,8 @@ export interface CleanupOptions {
1919
batchSize?: number
2020

2121
/**
22-
* Specific namespace to clean up, or undefined to clean all namespaces
22+
* Specific namespace prefix to clean up (e.g., 'webhook', 'polling')
23+
* Keys are prefixed with namespace, so this filters by key prefix
2324
*/
2425
namespace?: string
2526
}
@@ -53,13 +54,17 @@ export async function cleanupExpiredIdempotencyKeys(
5354

5455
while (hasMore) {
5556
try {
57+
// Build where condition - filter by cutoff date and optionally by namespace prefix
5658
const whereCondition = namespace
57-
? and(lt(idempotencyKey.createdAt, cutoffDate), eq(idempotencyKey.namespace, namespace))
59+
? and(
60+
lt(idempotencyKey.createdAt, cutoffDate),
61+
like(idempotencyKey.key, `${namespace}:%`)
62+
)
5863
: lt(idempotencyKey.createdAt, cutoffDate)
5964

60-
// First, find IDs to delete with limit
65+
// Find keys to delete with limit
6166
const toDelete = await db
62-
.select({ key: idempotencyKey.key, namespace: idempotencyKey.namespace })
67+
.select({ key: idempotencyKey.key })
6368
.from(idempotencyKey)
6469
.where(whereCondition)
6570
.limit(batchSize)
@@ -68,14 +73,13 @@ export async function cleanupExpiredIdempotencyKeys(
6873
break
6974
}
7075

71-
// Delete the found records
76+
// Delete the found records by key
7277
const deleteResult = await db
7378
.delete(idempotencyKey)
7479
.where(
75-
and(
76-
...toDelete.map((item) =>
77-
and(eq(idempotencyKey.key, item.key), eq(idempotencyKey.namespace, item.namespace))
78-
)
80+
inArray(
81+
idempotencyKey.key,
82+
toDelete.map((item) => item.key)
7983
)
8084
)
8185
.returning({ key: idempotencyKey.key })
@@ -126,6 +130,7 @@ export async function cleanupExpiredIdempotencyKeys(
126130

127131
/**
128132
* Get statistics about idempotency key usage
133+
* Uses SQL aggregations to avoid loading all keys into memory
129134
*/
130135
export async function getIdempotencyKeyStats(): Promise<{
131136
totalKeys: number
@@ -134,34 +139,35 @@ export async function getIdempotencyKeyStats(): Promise<{
134139
newestKey: Date | null
135140
}> {
136141
try {
137-
const allKeys = await db
142+
// Get total count and date range in a single query
143+
const [statsResult] = await db
138144
.select({
139-
namespace: idempotencyKey.namespace,
140-
createdAt: idempotencyKey.createdAt,
145+
totalKeys: count(),
146+
oldestKey: min(idempotencyKey.createdAt),
147+
newestKey: max(idempotencyKey.createdAt),
141148
})
142149
.from(idempotencyKey)
143150

144-
const totalKeys = allKeys.length
145-
const keysByNamespace: Record<string, number> = {}
146-
let oldestKey: Date | null = null
147-
let newestKey: Date | null = null
148-
149-
for (const key of allKeys) {
150-
keysByNamespace[key.namespace] = (keysByNamespace[key.namespace] || 0) + 1
151+
// Get counts by namespace prefix using SQL substring
152+
// Extracts everything before the first ':' as the namespace
153+
const namespaceStats = await db
154+
.select({
155+
namespace: sql<string>`split_part(${idempotencyKey.key}, ':', 1)`.as('namespace'),
156+
count: count(),
157+
})
158+
.from(idempotencyKey)
159+
.groupBy(sql`split_part(${idempotencyKey.key}, ':', 1)`)
151160

152-
if (!oldestKey || key.createdAt < oldestKey) {
153-
oldestKey = key.createdAt
154-
}
155-
if (!newestKey || key.createdAt > newestKey) {
156-
newestKey = key.createdAt
157-
}
161+
const keysByNamespace: Record<string, number> = {}
162+
for (const row of namespaceStats) {
163+
keysByNamespace[row.namespace || 'unknown'] = row.count
158164
}
159165

160166
return {
161-
totalKeys,
167+
totalKeys: statsResult?.totalKeys ?? 0,
162168
keysByNamespace,
163-
oldestKey,
164-
newestKey,
169+
oldestKey: statsResult?.oldestKey ?? null,
170+
newestKey: statsResult?.newestKey ?? null,
165171
}
166172
} catch (error) {
167173
logger.error('Failed to get idempotency key stats:', error)

apps/sim/lib/core/idempotency/service.ts

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { randomUUID } from 'crypto'
22
import { db } from '@sim/db'
33
import { idempotencyKey } from '@sim/db/schema'
44
import { createLogger } from '@sim/logger'
5-
import { and, eq } from 'drizzle-orm'
5+
import { eq } from 'drizzle-orm'
66
import { getRedisClient } from '@/lib/core/config/redis'
77
import { getStorageMethod, type StorageMethod } from '@/lib/core/storage'
88
import { extractProviderIdentifierFromBody } from '@/lib/webhooks/provider-utils'
@@ -124,12 +124,7 @@ export class IdempotencyService {
124124
const existing = await db
125125
.select({ result: idempotencyKey.result, createdAt: idempotencyKey.createdAt })
126126
.from(idempotencyKey)
127-
.where(
128-
and(
129-
eq(idempotencyKey.key, normalizedKey),
130-
eq(idempotencyKey.namespace, this.config.namespace)
131-
)
132-
)
127+
.where(eq(idempotencyKey.key, normalizedKey))
133128
.limit(1)
134129

135130
if (existing.length > 0) {
@@ -224,11 +219,12 @@ export class IdempotencyService {
224219
.insert(idempotencyKey)
225220
.values({
226221
key: normalizedKey,
227-
namespace: this.config.namespace,
228222
result: inProgressResult,
229223
createdAt: new Date(),
230224
})
231-
.onConflictDoNothing()
225+
.onConflictDoNothing({
226+
target: [idempotencyKey.key],
227+
})
232228
.returning({ key: idempotencyKey.key })
233229

234230
if (insertResult.length > 0) {
@@ -243,12 +239,7 @@ export class IdempotencyService {
243239
const existing = await db
244240
.select({ result: idempotencyKey.result })
245241
.from(idempotencyKey)
246-
.where(
247-
and(
248-
eq(idempotencyKey.key, normalizedKey),
249-
eq(idempotencyKey.namespace, this.config.namespace)
250-
)
251-
)
242+
.where(eq(idempotencyKey.key, normalizedKey))
252243
.limit(1)
253244

254245
const existingResult =
@@ -280,12 +271,7 @@ export class IdempotencyService {
280271
const existing = await db
281272
.select({ result: idempotencyKey.result })
282273
.from(idempotencyKey)
283-
.where(
284-
and(
285-
eq(idempotencyKey.key, normalizedKey),
286-
eq(idempotencyKey.namespace, this.config.namespace)
287-
)
288-
)
274+
.where(eq(idempotencyKey.key, normalizedKey))
289275
.limit(1)
290276
currentResult = existing.length > 0 ? (existing[0].result as ProcessingResult) : null
291277
}
@@ -339,12 +325,11 @@ export class IdempotencyService {
339325
.insert(idempotencyKey)
340326
.values({
341327
key: normalizedKey,
342-
namespace: this.config.namespace,
343328
result: result,
344329
createdAt: new Date(),
345330
})
346331
.onConflictDoUpdate({
347-
target: [idempotencyKey.key, idempotencyKey.namespace],
332+
target: [idempotencyKey.key],
348333
set: {
349334
result: result,
350335
createdAt: new Date(),
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
DROP INDEX "idempotency_key_namespace_unique";--> statement-breakpoint
2+
DROP INDEX "idempotency_key_namespace_idx";--> statement-breakpoint
3+
ALTER TABLE "idempotency_key" ADD PRIMARY KEY ("key");--> statement-breakpoint
4+
ALTER TABLE "idempotency_key" DROP COLUMN "namespace";

0 commit comments

Comments
 (0)