Skip to content

Commit f811594

Browse files
improvement(rooms): redis client closed should fail with indicator (#3115)
* improvement(rooms): redis client closed should fail fast * bugbot comment * consolidate
1 parent 0bc245b commit f811594

File tree

9 files changed

+140
-45
lines changed

9 files changed

+140
-45
lines changed

apps/sim/app/workspace/providers/socket-provider.tsx

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { createLogger } from '@sim/logger'
1414
import { useParams } from 'next/navigation'
1515
import { io, type Socket } from 'socket.io-client'
1616
import { getEnv } from '@/lib/core/config/env'
17+
import { useOperationQueueStore } from '@/stores/operation-queue/store'
1718

1819
const logger = createLogger('SocketContext')
1920

@@ -138,6 +139,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
138139
const [authFailed, setAuthFailed] = useState(false)
139140
const initializedRef = useRef(false)
140141
const socketRef = useRef<Socket | null>(null)
142+
const triggerOfflineMode = useOperationQueueStore((state) => state.triggerOfflineMode)
141143

142144
const params = useParams()
143145
const urlWorkflowId = params?.workflowId as string | undefined
@@ -341,9 +343,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
341343
})
342344
})
343345

344-
socketInstance.on('join-workflow-error', ({ error }) => {
346+
socketInstance.on('join-workflow-error', ({ error, code }) => {
345347
isRejoiningRef.current = false
346-
logger.error('Failed to join workflow:', error)
348+
logger.error('Failed to join workflow:', { error, code })
349+
if (code === 'ROOM_MANAGER_UNAVAILABLE') {
350+
triggerOfflineMode()
351+
}
347352
})
348353

349354
socketInstance.on('workflow-operation', (data) => {

apps/sim/socket/handlers/operations.ts

Lines changed: 62 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -12,39 +12,70 @@ import {
1212
import { persistWorkflowOperation } from '@/socket/database/operations'
1313
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
1414
import { checkRolePermission } from '@/socket/middleware/permissions'
15-
import type { IRoomManager } from '@/socket/rooms'
15+
import type { IRoomManager, UserSession } from '@/socket/rooms'
1616
import { WorkflowOperationSchema } from '@/socket/validation/schemas'
1717

1818
const logger = createLogger('OperationsHandlers')
1919

2020
export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
2121
socket.on('workflow-operation', async (data) => {
22-
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
23-
const session = await roomManager.getUserSession(socket.id)
22+
const emitOperationError = (
23+
forbidden: { type: string; message: string; operation?: string; target?: string },
24+
failed?: { error: string; retryable?: boolean }
25+
) => {
26+
socket.emit('operation-forbidden', forbidden)
27+
if (failed && data?.operationId) {
28+
socket.emit('operation-failed', { operationId: data.operationId, ...failed })
29+
}
30+
}
31+
32+
if (!roomManager.isReady()) {
33+
emitOperationError(
34+
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
35+
{ error: 'Realtime unavailable', retryable: true }
36+
)
37+
return
38+
}
39+
40+
let workflowId: string | null = null
41+
let session: UserSession | null = null
42+
43+
try {
44+
workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
45+
session = await roomManager.getUserSession(socket.id)
46+
} catch (error) {
47+
logger.error('Error loading session for workflow operation:', error)
48+
emitOperationError(
49+
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
50+
{ error: 'Realtime unavailable', retryable: true }
51+
)
52+
return
53+
}
2454

2555
if (!workflowId || !session) {
26-
socket.emit('operation-forbidden', {
27-
type: 'SESSION_ERROR',
28-
message: 'Session expired, please rejoin workflow',
29-
})
30-
if (data?.operationId) {
31-
socket.emit('operation-failed', { operationId: data.operationId, error: 'Session expired' })
32-
}
56+
emitOperationError(
57+
{ type: 'SESSION_ERROR', message: 'Session expired, please rejoin workflow' },
58+
{ error: 'Session expired' }
59+
)
3360
return
3461
}
3562

36-
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
63+
let hasRoom = false
64+
try {
65+
hasRoom = await roomManager.hasWorkflowRoom(workflowId)
66+
} catch (error) {
67+
logger.error('Error checking workflow room:', error)
68+
emitOperationError(
69+
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
70+
{ error: 'Realtime unavailable', retryable: true }
71+
)
72+
return
73+
}
3774
if (!hasRoom) {
38-
socket.emit('operation-forbidden', {
39-
type: 'ROOM_NOT_FOUND',
40-
message: 'Workflow room not found',
41-
})
42-
if (data?.operationId) {
43-
socket.emit('operation-failed', {
44-
operationId: data.operationId,
45-
error: 'Workflow room not found',
46-
})
47-
}
75+
emitOperationError(
76+
{ type: 'ROOM_NOT_FOUND', message: 'Workflow room not found' },
77+
{ error: 'Workflow room not found' }
78+
)
4879
return
4980
}
5081

@@ -77,15 +108,15 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
77108
// Check permissions from cached role for all other operations
78109
if (!userPresence) {
79110
logger.warn(`User presence not found for socket ${socket.id}`)
80-
socket.emit('operation-forbidden', {
81-
type: 'SESSION_ERROR',
82-
message: 'User session not found',
83-
operation,
84-
target,
85-
})
86-
if (operationId) {
87-
socket.emit('operation-failed', { operationId, error: 'User session not found' })
88-
}
111+
emitOperationError(
112+
{
113+
type: 'SESSION_ERROR',
114+
message: 'User session not found',
115+
operation,
116+
target,
117+
},
118+
{ error: 'User session not found' }
119+
)
89120
return
90121
}
91122

@@ -97,7 +128,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
97128
logger.warn(
98129
`User ${session.userId} (role: ${userPresence.role}) forbidden from ${operation} on ${target}`
99130
)
100-
socket.emit('operation-forbidden', {
131+
emitOperationError({
101132
type: 'INSUFFICIENT_PERMISSIONS',
102133
message: `${permissionCheck.reason} on '${target}'`,
103134
operation,

apps/sim/socket/handlers/subblocks.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,21 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
4848
operationId,
4949
} = data
5050

51+
if (!roomManager.isReady()) {
52+
socket.emit('operation-forbidden', {
53+
type: 'ROOM_MANAGER_UNAVAILABLE',
54+
message: 'Realtime unavailable',
55+
})
56+
if (operationId) {
57+
socket.emit('operation-failed', {
58+
operationId,
59+
error: 'Realtime unavailable',
60+
retryable: true,
61+
})
62+
}
63+
return
64+
}
65+
5166
try {
5267
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
5368
const session = await roomManager.getUserSession(socket.id)

apps/sim/socket/handlers/variables.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,21 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
3737
socket.on('variable-update', async (data) => {
3838
const { workflowId: payloadWorkflowId, variableId, field, value, timestamp, operationId } = data
3939

40+
if (!roomManager.isReady()) {
41+
socket.emit('operation-forbidden', {
42+
type: 'ROOM_MANAGER_UNAVAILABLE',
43+
message: 'Realtime unavailable',
44+
})
45+
if (operationId) {
46+
socket.emit('operation-failed', {
47+
operationId,
48+
error: 'Realtime unavailable',
49+
retryable: true,
50+
})
51+
}
52+
return
53+
}
54+
4055
try {
4156
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
4257
const session = await roomManager.getUserSession(socket.id)

apps/sim/socket/handlers/workflow.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,15 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
2020
return
2121
}
2222

23+
if (!roomManager.isReady()) {
24+
logger.warn(`Join workflow rejected: Room manager unavailable`)
25+
socket.emit('join-workflow-error', {
26+
error: 'Realtime unavailable',
27+
code: 'ROOM_MANAGER_UNAVAILABLE',
28+
})
29+
return
30+
}
31+
2332
logger.info(`Join workflow request from ${userId} (${userName}) for workflow ${workflowId}`)
2433

2534
// Verify workflow access
@@ -128,12 +137,20 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
128137
// Undo socket.join and room manager entry if any operation failed
129138
socket.leave(workflowId)
130139
await roomManager.removeUserFromRoom(socket.id)
131-
socket.emit('join-workflow-error', { error: 'Failed to join workflow' })
140+
const isReady = roomManager.isReady()
141+
socket.emit('join-workflow-error', {
142+
error: isReady ? 'Failed to join workflow' : 'Realtime unavailable',
143+
code: isReady ? undefined : 'ROOM_MANAGER_UNAVAILABLE',
144+
})
132145
}
133146
})
134147

135148
socket.on('leave-workflow', async () => {
136149
try {
150+
if (!roomManager.isReady()) {
151+
return
152+
}
153+
137154
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
138155
const session = await roomManager.getUserSession(socket.id)
139156

apps/sim/socket/rooms/memory-manager.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ export class MemoryRoomManager implements IRoomManager {
2626
logger.info('MemoryRoomManager initialized (single-pod mode)')
2727
}
2828

29+
isReady(): boolean {
30+
return true
31+
}
32+
2933
async shutdown(): Promise<void> {
3034
this.workflowRooms.clear()
3135
this.socketToWorkflow.clear()

apps/sim/socket/rooms/redis-manager.ts

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -96,17 +96,6 @@ export class RedisRoomManager implements IRoomManager {
9696
this._io = io
9797
this.redis = createClient({
9898
url: redisUrl,
99-
socket: {
100-
reconnectStrategy: (retries) => {
101-
if (retries > 10) {
102-
logger.error('Redis reconnection failed after 10 attempts')
103-
return new Error('Redis reconnection failed')
104-
}
105-
const delay = Math.min(retries * 100, 3000)
106-
logger.warn(`Redis reconnecting in ${delay}ms (attempt ${retries})`)
107-
return delay
108-
},
109-
},
11099
})
111100

112101
this.redis.on('error', (err) => {
@@ -122,12 +111,21 @@ export class RedisRoomManager implements IRoomManager {
122111
logger.info('Redis client ready')
123112
this.isConnected = true
124113
})
114+
115+
this.redis.on('end', () => {
116+
logger.warn('Redis client connection closed')
117+
this.isConnected = false
118+
})
125119
}
126120

127121
get io(): Server {
128122
return this._io
129123
}
130124

125+
isReady(): boolean {
126+
return this.isConnected
127+
}
128+
131129
async initialize(): Promise<void> {
132130
if (this.isConnected) return
133131

apps/sim/socket/rooms/types.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ export interface IRoomManager {
4848
*/
4949
initialize(): Promise<void>
5050

51+
/**
52+
* Whether the room manager is ready to serve requests
53+
*/
54+
isReady(): boolean
55+
5156
/**
5257
* Clean shutdown
5358
*/

apps/sim/socket/routes/http.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ export function createHttpHandler(roomManager: IRoomManager, logger: Logger) {
8585
res.end(JSON.stringify({ error: authResult.error }))
8686
return
8787
}
88+
89+
if (!roomManager.isReady()) {
90+
sendError(res, 'Room manager unavailable', 503)
91+
return
92+
}
8893
}
8994

9095
// Handle workflow deletion notifications from the main API

0 commit comments

Comments
 (0)