Skip to content

Commit 6e66168

Browse files
committed
fix(redis): prevent false rate limits and code execution failures during Redis outages
1 parent 2a6d4fc commit 6e66168

File tree

8 files changed

+415
-39
lines changed

8 files changed

+415
-39
lines changed
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
import { createEnvMock, createMockRedis, loggerMock } from '@sim/testing'
2+
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
3+
4+
const mockRedisInstance = createMockRedis()
5+
6+
vi.mock('@sim/logger', () => loggerMock)
7+
vi.mock('@/lib/core/config/env', () => createEnvMock({ REDIS_URL: 'redis://localhost:6379' }))
8+
vi.mock('ioredis', () => ({
9+
default: vi.fn(() => mockRedisInstance),
10+
}))
11+
12+
describe('redis config', () => {
13+
beforeEach(() => {
14+
vi.clearAllMocks()
15+
vi.useFakeTimers()
16+
})
17+
18+
afterEach(() => {
19+
vi.useRealTimers()
20+
vi.resetModules()
21+
})
22+
23+
describe('onRedisReconnect', () => {
24+
it('should register and invoke reconnect listeners', async () => {
25+
const { onRedisReconnect, getRedisClient } = await import('./redis')
26+
const listener = vi.fn()
27+
onRedisReconnect(listener)
28+
29+
getRedisClient()
30+
31+
mockRedisInstance.ping.mockRejectedValue(new Error('ETIMEDOUT'))
32+
await vi.advanceTimersByTimeAsync(30_000)
33+
await vi.advanceTimersByTimeAsync(30_000)
34+
await vi.advanceTimersByTimeAsync(30_000)
35+
36+
expect(listener).toHaveBeenCalledTimes(1)
37+
})
38+
39+
it('should not invoke listeners when PINGs succeed', async () => {
40+
const { onRedisReconnect, getRedisClient } = await import('./redis')
41+
const listener = vi.fn()
42+
onRedisReconnect(listener)
43+
44+
getRedisClient()
45+
mockRedisInstance.ping.mockResolvedValue('PONG')
46+
47+
await vi.advanceTimersByTimeAsync(30_000)
48+
await vi.advanceTimersByTimeAsync(30_000)
49+
await vi.advanceTimersByTimeAsync(30_000)
50+
51+
expect(listener).not.toHaveBeenCalled()
52+
})
53+
54+
it('should reset failure count on successful PING', async () => {
55+
const { onRedisReconnect, getRedisClient } = await import('./redis')
56+
const listener = vi.fn()
57+
onRedisReconnect(listener)
58+
59+
getRedisClient()
60+
61+
// 2 failures then a success — should reset counter
62+
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
63+
await vi.advanceTimersByTimeAsync(30_000)
64+
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
65+
await vi.advanceTimersByTimeAsync(30_000)
66+
mockRedisInstance.ping.mockResolvedValueOnce('PONG')
67+
await vi.advanceTimersByTimeAsync(30_000)
68+
69+
// 2 more failures — should NOT trigger reconnect (counter was reset)
70+
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
71+
await vi.advanceTimersByTimeAsync(30_000)
72+
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
73+
await vi.advanceTimersByTimeAsync(30_000)
74+
75+
expect(listener).not.toHaveBeenCalled()
76+
})
77+
78+
it('should call disconnect(true) after 3 consecutive PING failures', async () => {
79+
const { getRedisClient } = await import('./redis')
80+
getRedisClient()
81+
82+
mockRedisInstance.ping.mockRejectedValue(new Error('ETIMEDOUT'))
83+
await vi.advanceTimersByTimeAsync(30_000)
84+
await vi.advanceTimersByTimeAsync(30_000)
85+
86+
expect(mockRedisInstance.disconnect).not.toHaveBeenCalled()
87+
88+
await vi.advanceTimersByTimeAsync(30_000)
89+
expect(mockRedisInstance.disconnect).toHaveBeenCalledWith(true)
90+
})
91+
92+
it('should handle listener errors gracefully without breaking health check', async () => {
93+
const { onRedisReconnect, getRedisClient } = await import('./redis')
94+
const badListener = vi.fn(() => {
95+
throw new Error('listener crashed')
96+
})
97+
const goodListener = vi.fn()
98+
onRedisReconnect(badListener)
99+
onRedisReconnect(goodListener)
100+
101+
getRedisClient()
102+
mockRedisInstance.ping.mockRejectedValue(new Error('timeout'))
103+
await vi.advanceTimersByTimeAsync(30_000)
104+
await vi.advanceTimersByTimeAsync(30_000)
105+
await vi.advanceTimersByTimeAsync(30_000)
106+
107+
expect(badListener).toHaveBeenCalledTimes(1)
108+
expect(goodListener).toHaveBeenCalledTimes(1)
109+
})
110+
})
111+
112+
describe('closeRedisConnection', () => {
113+
it('should clear the PING interval', async () => {
114+
const { getRedisClient, closeRedisConnection } = await import('./redis')
115+
getRedisClient()
116+
117+
mockRedisInstance.quit.mockResolvedValue('OK')
118+
await closeRedisConnection()
119+
120+
// After closing, PING failures should not trigger disconnect
121+
mockRedisInstance.ping.mockRejectedValue(new Error('timeout'))
122+
await vi.advanceTimersByTimeAsync(30_000 * 5)
123+
expect(mockRedisInstance.disconnect).not.toHaveBeenCalled()
124+
})
125+
})
126+
127+
describe('retryStrategy', () => {
128+
async function captureRetryStrategy(): Promise<(times: number) => number> {
129+
vi.resetModules()
130+
131+
vi.doMock('@sim/logger', () => loggerMock)
132+
vi.doMock('@/lib/core/config/env', () =>
133+
createEnvMock({ REDIS_URL: 'redis://localhost:6379' })
134+
)
135+
136+
let capturedConfig: Record<string, unknown> = {}
137+
vi.doMock('ioredis', () => ({
138+
default: vi.fn((_url: string, config: Record<string, unknown>) => {
139+
capturedConfig = config
140+
return { ping: vi.fn(), on: vi.fn() }
141+
}),
142+
}))
143+
144+
const { getRedisClient } = await import('./redis')
145+
getRedisClient()
146+
147+
return capturedConfig.retryStrategy as (times: number) => number
148+
}
149+
150+
it('should use exponential backoff with jitter', async () => {
151+
const retryStrategy = await captureRetryStrategy()
152+
expect(retryStrategy).toBeDefined()
153+
154+
// Base for attempt 1: min(1000 * 2^0, 10000) = 1000, jitter up to 300
155+
const delay1 = retryStrategy(1)
156+
expect(delay1).toBeGreaterThanOrEqual(1000)
157+
expect(delay1).toBeLessThanOrEqual(1300)
158+
159+
// Base for attempt 3: min(1000 * 2^2, 10000) = 4000, jitter up to 1200
160+
const delay3 = retryStrategy(3)
161+
expect(delay3).toBeGreaterThanOrEqual(4000)
162+
expect(delay3).toBeLessThanOrEqual(5200)
163+
164+
// Base for attempt 5: min(1000 * 2^4, 10000) = 10000, jitter up to 3000
165+
const delay5 = retryStrategy(5)
166+
expect(delay5).toBeGreaterThanOrEqual(10000)
167+
expect(delay5).toBeLessThanOrEqual(13000)
168+
})
169+
170+
it('should cap at 30s for attempts beyond 10', async () => {
171+
const retryStrategy = await captureRetryStrategy()
172+
expect(retryStrategy(11)).toBe(30000)
173+
expect(retryStrategy(100)).toBe(30000)
174+
})
175+
})
176+
})

apps/sim/lib/core/config/redis.ts

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,63 @@ const logger = createLogger('Redis')
77
const redisUrl = env.REDIS_URL
88

99
let globalRedisClient: Redis | null = null
10+
let pingFailures = 0
11+
let pingInterval: NodeJS.Timeout | null = null
12+
let pingInFlight = false
13+
14+
const PING_INTERVAL_MS = 30_000
15+
const MAX_PING_FAILURES = 3
16+
17+
/** Callbacks invoked when the PING health check forces a reconnect. */
18+
const reconnectListeners: Array<() => void> = []
19+
20+
/**
21+
* Register a callback that fires when the PING health check forces a reconnect.
22+
* Useful for resetting cached adapters that hold a stale Redis reference.
23+
*/
24+
export function onRedisReconnect(cb: () => void): void {
25+
reconnectListeners.push(cb)
26+
}
27+
28+
function startPingHealthCheck(redis: Redis): void {
29+
if (pingInterval) return
30+
31+
pingInterval = setInterval(async () => {
32+
if (pingInFlight) return
33+
pingInFlight = true
34+
try {
35+
await redis.ping()
36+
pingFailures = 0
37+
} catch (error) {
38+
pingFailures++
39+
logger.warn('Redis PING failed', {
40+
consecutiveFailures: pingFailures,
41+
error: error instanceof Error ? error.message : String(error),
42+
})
43+
44+
if (pingFailures >= MAX_PING_FAILURES) {
45+
logger.error('Redis PING failed 3 consecutive times — forcing reconnect', {
46+
consecutiveFailures: pingFailures,
47+
})
48+
pingFailures = 0
49+
for (const cb of reconnectListeners) {
50+
try {
51+
cb()
52+
} catch (cbError) {
53+
logger.error('Redis reconnect listener error', { error: cbError })
54+
}
55+
}
56+
try {
57+
redis.disconnect(true)
58+
} catch (disconnectError) {
59+
logger.error('Error during forced Redis disconnect', { error: disconnectError })
60+
}
61+
}
62+
} finally {
63+
pingInFlight = false
64+
}
65+
}, PING_INTERVAL_MS)
66+
}
1067

1168
/**
1269
* Get a Redis client instance.
@@ -35,8 +92,10 @@ export function getRedisClient(): Redis | null {
3592
logger.error(`Redis reconnection attempt ${times}`, { nextRetryMs: 30000 })
3693
return 30000
3794
}
38-
const delay = Math.min(times * 500, 5000)
39-
logger.warn(`Redis reconnecting`, { attempt: times, nextRetryMs: delay })
95+
const base = Math.min(1000 * 2 ** (times - 1), 10000)
96+
const jitter = Math.random() * base * 0.3
97+
const delay = Math.round(base + jitter)
98+
logger.warn('Redis reconnecting', { attempt: times, nextRetryMs: delay })
4099
return delay
41100
},
42101

@@ -54,6 +113,8 @@ export function getRedisClient(): Redis | null {
54113
globalRedisClient.on('close', () => logger.warn('Redis connection closed'))
55114
globalRedisClient.on('end', () => logger.error('Redis connection ended'))
56115

116+
startPingHealthCheck(globalRedisClient)
117+
57118
return globalRedisClient
58119
} catch (error) {
59120
logger.error('Failed to initialize Redis client', { error })
@@ -118,6 +179,11 @@ export async function releaseLock(lockKey: string, value: string): Promise<boole
118179
* Use for graceful shutdown.
119180
*/
120181
export async function closeRedisConnection(): Promise<void> {
182+
if (pingInterval) {
183+
clearInterval(pingInterval)
184+
pingInterval = null
185+
}
186+
121187
if (globalRedisClient) {
122188
try {
123189
await globalRedisClient.quit()

apps/sim/lib/core/rate-limiter/rate-limiter.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ describe('RateLimiter', () => {
172172
)
173173
})
174174

175-
it('should deny on storage error (fail closed)', async () => {
175+
it('should allow on storage error (fail open)', async () => {
176176
mockAdapter.consumeTokens.mockRejectedValue(new Error('Storage error'))
177177

178178
const result = await rateLimiter.checkRateLimitWithSubscription(
@@ -182,8 +182,8 @@ describe('RateLimiter', () => {
182182
false
183183
)
184184

185-
expect(result.allowed).toBe(false)
186-
expect(result.remaining).toBe(0)
185+
expect(result.allowed).toBe(true)
186+
expect(result.remaining).toBe(1)
187187
})
188188

189189
it('should work for all non-manual trigger types', async () => {

apps/sim/lib/core/rate-limiter/rate-limiter.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,17 +100,16 @@ export class RateLimiter {
100100
retryAfterMs: result.retryAfterMs,
101101
}
102102
} catch (error) {
103-
logger.error('Rate limit storage error - failing closed (denying request)', {
103+
logger.error('Rate limit storage error - failing open (allowing request)', {
104104
error: error instanceof Error ? error.message : String(error),
105105
userId,
106106
triggerType,
107107
isAsync,
108108
})
109109
return {
110-
allowed: false,
111-
remaining: 0,
110+
allowed: true,
111+
remaining: 1,
112112
resetAt: new Date(Date.now() + RATE_LIMIT_WINDOW_MS),
113-
retryAfterMs: RATE_LIMIT_WINDOW_MS,
114113
}
115114
}
116115
}

0 commit comments

Comments
 (0)