Skip to content

Commit fb6f555

Browse files
improvement(sockets): add batch subblock updates for duplicate to clear queue faster (#835)
1 parent 84f095d commit fb6f555

File tree

4 files changed

+278
-9
lines changed

4 files changed

+278
-9
lines changed

apps/sim/contexts/socket-context.tsx

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,17 @@ interface SocketContextType {
5050
value: any,
5151
operationId?: string
5252
) => void
53+
emitBatchSubblockUpdate: (
54+
blockId: string,
55+
subblockValues: Record<string, any>,
56+
operationId?: string
57+
) => void
5358
emitCursorUpdate: (cursor: { x: number; y: number }) => void
5459
emitSelectionUpdate: (selection: { type: 'block' | 'edge' | 'none'; id?: string }) => void
5560
// Event handlers for receiving real-time updates
5661
onWorkflowOperation: (handler: (data: any) => void) => void
5762
onSubblockUpdate: (handler: (data: any) => void) => void
63+
onBatchSubblockUpdate: (handler: (data: any) => void) => void
5864
onCursorUpdate: (handler: (data: any) => void) => void
5965
onSelectionUpdate: (handler: (data: any) => void) => void
6066
onUserJoined: (handler: (data: any) => void) => void
@@ -75,10 +81,12 @@ const SocketContext = createContext<SocketContextType>({
7581
leaveWorkflow: () => {},
7682
emitWorkflowOperation: () => {},
7783
emitSubblockUpdate: () => {},
84+
emitBatchSubblockUpdate: () => {},
7885
emitCursorUpdate: () => {},
7986
emitSelectionUpdate: () => {},
8087
onWorkflowOperation: () => {},
8188
onSubblockUpdate: () => {},
89+
onBatchSubblockUpdate: () => {},
8290
onCursorUpdate: () => {},
8391
onSelectionUpdate: () => {},
8492
onUserJoined: () => {},
@@ -111,6 +119,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
111119
const eventHandlers = useRef<{
112120
workflowOperation?: (data: any) => void
113121
subblockUpdate?: (data: any) => void
122+
batchSubblockUpdate?: (data: any) => void
114123
cursorUpdate?: (data: any) => void
115124
selectionUpdate?: (data: any) => void
116125
userJoined?: (data: any) => void
@@ -289,6 +298,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
289298
eventHandlers.current.subblockUpdate?.(data)
290299
})
291300

301+
// Batch subblock update events
302+
socketInstance.on('batch-subblock-update', (data) => {
303+
eventHandlers.current.batchSubblockUpdate?.(data)
304+
})
305+
292306
// Workflow deletion events
293307
socketInstance.on('workflow-deleted', (data) => {
294308
logger.warn(`Workflow ${data.workflowId} has been deleted`)
@@ -694,6 +708,29 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
694708
[socket, currentWorkflowId]
695709
)
696710

711+
// Emit batch subblock value updates
712+
const emitBatchSubblockUpdate = useCallback(
713+
(blockId: string, subblockValues: Record<string, any>, operationId?: string) => {
714+
// Only emit if socket is connected and we're in a valid workflow room
715+
if (socket && currentWorkflowId) {
716+
socket.emit('batch-subblock-update', {
717+
blockId,
718+
subblockValues,
719+
timestamp: Date.now(),
720+
operationId, // Include operation ID for queue tracking
721+
})
722+
} else {
723+
logger.warn('Cannot emit batch subblock update: no socket connection or workflow room', {
724+
hasSocket: !!socket,
725+
currentWorkflowId,
726+
blockId,
727+
subblockCount: Object.keys(subblockValues).length,
728+
})
729+
}
730+
},
731+
[socket, currentWorkflowId]
732+
)
733+
697734
// Cursor throttling optimized for database connection health
698735
const lastCursorEmit = useRef(0)
699736
const emitCursorUpdate = useCallback(
@@ -729,6 +766,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
729766
eventHandlers.current.subblockUpdate = handler
730767
}, [])
731768

769+
const onBatchSubblockUpdate = useCallback((handler: (data: any) => void) => {
770+
eventHandlers.current.batchSubblockUpdate = handler
771+
}, [])
772+
732773
const onCursorUpdate = useCallback((handler: (data: any) => void) => {
733774
eventHandlers.current.cursorUpdate = handler
734775
}, [])
@@ -773,10 +814,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
773814
leaveWorkflow,
774815
emitWorkflowOperation,
775816
emitSubblockUpdate,
817+
emitBatchSubblockUpdate,
776818
emitCursorUpdate,
777819
emitSelectionUpdate,
778820
onWorkflowOperation,
779821
onSubblockUpdate,
822+
onBatchSubblockUpdate,
780823
onCursorUpdate,
781824
onSelectionUpdate,
782825
onUserJoined,

apps/sim/hooks/use-collaborative-workflow.ts

Lines changed: 73 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ export function useCollaborativeWorkflow() {
2222
leaveWorkflow,
2323
emitWorkflowOperation,
2424
emitSubblockUpdate,
25+
emitBatchSubblockUpdate,
2526
onWorkflowOperation,
2627
onSubblockUpdate,
28+
onBatchSubblockUpdate,
2729
onUserJoined,
2830
onUserLeft,
2931
onWorkflowDeleted,
@@ -71,8 +73,13 @@ export function useCollaborativeWorkflow() {
7173

7274
// Register emit functions with operation queue store
7375
useEffect(() => {
74-
registerEmitFunctions(emitWorkflowOperation, emitSubblockUpdate, currentWorkflowId)
75-
}, [emitWorkflowOperation, emitSubblockUpdate, currentWorkflowId])
76+
registerEmitFunctions(
77+
emitWorkflowOperation,
78+
emitSubblockUpdate,
79+
emitBatchSubblockUpdate,
80+
currentWorkflowId
81+
)
82+
}, [emitWorkflowOperation, emitSubblockUpdate, emitBatchSubblockUpdate, currentWorkflowId])
7683

7784
useEffect(() => {
7885
const handleWorkflowOperation = (data: any) => {
@@ -238,6 +245,29 @@ export function useCollaborativeWorkflow() {
238245
}
239246
}
240247

248+
const handleBatchSubblockUpdate = (data: any) => {
249+
const { blockId, subblockValues, userId } = data
250+
251+
if (isApplyingRemoteChange.current) return
252+
253+
logger.info(
254+
`Received batch subblock update from user ${userId}: ${blockId} (${Object.keys(subblockValues).length} subblocks)`
255+
)
256+
257+
isApplyingRemoteChange.current = true
258+
259+
try {
260+
// Apply all subblock values in batch
261+
Object.entries(subblockValues).forEach(([subblockId, value]) => {
262+
subBlockStore.setValue(blockId, subblockId, value)
263+
})
264+
} catch (error) {
265+
logger.error('Error applying remote batch subblock update:', error)
266+
} finally {
267+
isApplyingRemoteChange.current = false
268+
}
269+
}
270+
241271
const handleUserJoined = (data: any) => {
242272
logger.info(`User joined: ${data.userName}`)
243273
}
@@ -343,6 +373,7 @@ export function useCollaborativeWorkflow() {
343373
// Register event handlers
344374
onWorkflowOperation(handleWorkflowOperation)
345375
onSubblockUpdate(handleSubblockUpdate)
376+
onBatchSubblockUpdate(handleBatchSubblockUpdate)
346377
onUserJoined(handleUserJoined)
347378
onUserLeft(handleUserLeft)
348379
onWorkflowDeleted(handleWorkflowDeleted)
@@ -356,6 +387,7 @@ export function useCollaborativeWorkflow() {
356387
}, [
357388
onWorkflowOperation,
358389
onSubblockUpdate,
390+
onBatchSubblockUpdate,
359391
onUserJoined,
360392
onUserLeft,
361393
onWorkflowDeleted,
@@ -723,6 +755,43 @@ export function useCollaborativeWorkflow() {
723755
]
724756
)
725757

758+
const collaborativeBatchSetSubblockValues = useCallback(
759+
(blockId: string, subblockValues: Record<string, any>) => {
760+
if (isApplyingRemoteChange.current) return
761+
762+
if (!currentWorkflowId || activeWorkflowId !== currentWorkflowId) {
763+
logger.debug('Skipping batch subblock update - not in active workflow', {
764+
currentWorkflowId,
765+
activeWorkflowId,
766+
blockId,
767+
subblockCount: Object.keys(subblockValues).length,
768+
})
769+
return
770+
}
771+
772+
// Generate operation ID for queue tracking
773+
const operationId = crypto.randomUUID()
774+
775+
// Add to queue for retry mechanism
776+
addToQueue({
777+
id: operationId,
778+
operation: {
779+
operation: 'batch-subblock-update',
780+
target: 'block',
781+
payload: { blockId, subblockValues },
782+
},
783+
workflowId: activeWorkflowId || '',
784+
userId: session?.user?.id || 'unknown',
785+
})
786+
787+
// Apply locally first (immediate UI feedback)
788+
Object.entries(subblockValues).forEach(([subblockId, value]) => {
789+
subBlockStore.setValue(blockId, subblockId, value)
790+
})
791+
},
792+
[subBlockStore, currentWorkflowId, activeWorkflowId, addToQueue, session?.user?.id]
793+
)
794+
726795
const collaborativeDuplicateBlock = useCallback(
727796
(sourceId: string) => {
728797
const sourceBlock = workflowStore.blocks[sourceId]
@@ -794,9 +863,7 @@ export function useCollaborativeWorkflow() {
794863

795864
const subBlockValues = subBlockStore.workflowValues[activeWorkflowId || '']?.[sourceId]
796865
if (subBlockValues && activeWorkflowId) {
797-
Object.entries(subBlockValues).forEach(([subblockId, value]) => {
798-
collaborativeSetSubblockValue(newId, subblockId, value)
799-
})
866+
collaborativeBatchSetSubblockValues(newId, subBlockValues)
800867
}
801868
})
802869
},
@@ -805,7 +872,7 @@ export function useCollaborativeWorkflow() {
805872
workflowStore,
806873
subBlockStore,
807874
activeWorkflowId,
808-
collaborativeSetSubblockValue,
875+
collaborativeBatchSetSubblockValues,
809876
]
810877
)
811878

apps/sim/socket-server/handlers/subblocks.ts

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,4 +158,152 @@ export function setupSubblocksHandlers(
158158
})
159159
}
160160
})
161+
162+
socket.on('batch-subblock-update', async (data) => {
163+
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
164+
const session = roomManager.getUserSession(socket.id)
165+
166+
if (!workflowId || !session) {
167+
logger.debug(`Ignoring batch subblock update: socket not connected to any workflow room`, {
168+
socketId: socket.id,
169+
hasWorkflowId: !!workflowId,
170+
hasSession: !!session,
171+
})
172+
return
173+
}
174+
175+
const { blockId, subblockValues, timestamp, operationId } = data
176+
const room = roomManager.getWorkflowRoom(workflowId)
177+
178+
if (!room) {
179+
logger.debug(`Ignoring batch subblock update: workflow room not found`, {
180+
socketId: socket.id,
181+
workflowId,
182+
blockId,
183+
subblockCount: Object.keys(subblockValues).length,
184+
})
185+
return
186+
}
187+
188+
try {
189+
const userPresence = room.users.get(socket.id)
190+
if (userPresence) {
191+
userPresence.lastActivity = Date.now()
192+
}
193+
194+
// First, verify that the workflow still exists in the database
195+
const workflowExists = await db
196+
.select({ id: workflow.id })
197+
.from(workflow)
198+
.where(eq(workflow.id, workflowId))
199+
.limit(1)
200+
201+
if (workflowExists.length === 0) {
202+
logger.warn(`Ignoring batch subblock update: workflow ${workflowId} no longer exists`, {
203+
socketId: socket.id,
204+
blockId,
205+
subblockCount: Object.keys(subblockValues).length,
206+
})
207+
roomManager.cleanupUserFromRoom(socket.id, workflowId)
208+
return
209+
}
210+
211+
let updateSuccessful = false
212+
213+
await db.transaction(async (tx) => {
214+
// Get the current block
215+
const [block] = await tx
216+
.select({ subBlocks: workflowBlocks.subBlocks })
217+
.from(workflowBlocks)
218+
.where(and(eq(workflowBlocks.id, blockId), eq(workflowBlocks.workflowId, workflowId)))
219+
.limit(1)
220+
221+
if (!block) {
222+
logger.debug(`Block ${blockId} not found in workflow ${workflowId}`)
223+
return
224+
}
225+
226+
const subBlocks = (block.subBlocks as any) || {}
227+
228+
// Update all subblock values in batch
229+
for (const [subblockId, value] of Object.entries(subblockValues)) {
230+
if (!subBlocks[subblockId]) {
231+
// Create new subblock with minimal structure
232+
subBlocks[subblockId] = {
233+
id: subblockId,
234+
type: 'unknown', // Will be corrected by next collaborative update
235+
value: value,
236+
}
237+
} else {
238+
// Preserve existing id and type, only update value
239+
subBlocks[subblockId] = {
240+
...subBlocks[subblockId],
241+
value: value,
242+
}
243+
}
244+
}
245+
246+
await tx
247+
.update(workflowBlocks)
248+
.set({
249+
subBlocks: subBlocks,
250+
updatedAt: new Date(),
251+
})
252+
.where(and(eq(workflowBlocks.id, blockId), eq(workflowBlocks.workflowId, workflowId)))
253+
254+
updateSuccessful = true
255+
})
256+
257+
// Only broadcast to other clients if the update was successful
258+
if (updateSuccessful) {
259+
socket.to(workflowId).emit('batch-subblock-update', {
260+
blockId,
261+
subblockValues,
262+
timestamp,
263+
senderId: socket.id,
264+
userId: session.userId,
265+
})
266+
267+
// Emit confirmation if operationId is provided
268+
if (operationId) {
269+
socket.emit('operation-confirmed', {
270+
operationId,
271+
serverTimestamp: Date.now(),
272+
})
273+
}
274+
275+
logger.debug(
276+
`Batch subblock update in workflow ${workflowId}: ${blockId} (${Object.keys(subblockValues).length} subblocks)`
277+
)
278+
} else if (operationId) {
279+
// Block was deleted - notify client that operation completed (but didn't update anything)
280+
socket.emit('operation-failed', {
281+
operationId,
282+
error: 'Block no longer exists',
283+
retryable: false, // No point retrying for deleted blocks
284+
})
285+
}
286+
} catch (error) {
287+
logger.error('Error handling batch subblock update:', error)
288+
289+
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
290+
291+
// Emit operation-failed for queue-tracked operations
292+
if (operationId) {
293+
socket.emit('operation-failed', {
294+
operationId,
295+
error: errorMessage,
296+
retryable: true, // Batch subblock updates are generally retryable
297+
})
298+
}
299+
300+
// Also emit legacy operation-error for backward compatibility
301+
socket.emit('operation-error', {
302+
type: 'BATCH_SUBBLOCK_UPDATE_FAILED',
303+
message: `Failed to update batch subblocks for ${blockId}: ${errorMessage}`,
304+
operation: 'batch-subblock-update',
305+
target: 'block',
306+
})
307+
}
308+
})
161309
}

0 commit comments

Comments
 (0)