Skip to content

Commit d3c1fb4

Browse files
committed
fix(sockets): added debouncing for sub-block values to prevent overloading socket server, fixed persistence issue during streaming back from LLM response format, removed unused events
1 parent 2ce68ae commit d3c1fb4

File tree

5 files changed

+222
-19
lines changed

5 files changed

+222
-19
lines changed

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/components/code.tsx

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,21 @@ export function Code({
7373
}
7474
}, [generationType])
7575

76-
// State management
77-
const [storeValue, setStoreValue] = useSubBlockValue(blockId, subBlockId)
76+
const [storeValue, setStoreValue, { isStreaming }] = useSubBlockValue(
77+
blockId,
78+
subBlockId,
79+
false,
80+
{
81+
debounceMs: 150,
82+
onStreamingStart: () => {
83+
logger.debug('Streaming started for code editing', { blockId, subBlockId })
84+
},
85+
onStreamingEnd: () => {
86+
logger.debug('Streaming ended for code editing', { blockId, subBlockId })
87+
},
88+
}
89+
)
90+
7891
const [code, setCode] = useState<string>('')
7992
const [_lineCount, setLineCount] = useState(1)
8093
const [showTags, setShowTags] = useState(false)
@@ -104,8 +117,7 @@ export function Code({
104117
// AI Code Generation Hook
105118
const handleStreamStart = () => {
106119
setCode('')
107-
// Optionally clear the store value too, though handleStreamChunk will update it
108-
// setStoreValue('')
120+
// No need to manually manage streaming - it's automatic now
109121
}
110122

111123
const handleGeneratedContent = (generatedCode: string) => {
@@ -120,6 +132,7 @@ export function Code({
120132
setCode((currentCode) => {
121133
const newCode = currentCode + chunk
122134
if (!isPreview && !disabled) {
135+
// Just update the value - streaming detection is automatic
123136
setStoreValue(newCode)
124137
}
125138
return newCode

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/components/response/response-format.tsx

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,16 @@ export function ResponseFormat({
5050
isPreview = false,
5151
previewValue,
5252
}: ResponseFormatProps) {
53-
const [storeValue, setStoreValue] = useSubBlockValue<JSONProperty[]>(blockId, subBlockId)
53+
const [storeValue, setStoreValue] = useSubBlockValue<JSONProperty[]>(blockId, subBlockId, false, {
54+
debounceMs: 200, // Slightly longer debounce for complex structures
55+
onStreamingStart: () => {
56+
console.debug('Starting bulk property operation', { blockId, subBlockId })
57+
},
58+
onStreamingEnd: () => {
59+
console.debug('Ending bulk property operation', { blockId, subBlockId })
60+
},
61+
})
62+
5463
const [showPreview, setShowPreview] = useState(false)
5564

5665
const value = isPreview ? previewValue : storeValue

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/hooks/use-sub-block-value.ts

Lines changed: 195 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
1-
import { useCallback, useEffect, useRef } from 'react'
1+
import { useCallback, useEffect, useRef, useState } from 'react'
22
import { isEqual } from 'lodash'
3+
import { createLogger } from '@/lib/logs/console-logger'
34
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
45
import { getProviderFromModel } from '@/providers/utils'
56
import { useGeneralStore } from '@/stores/settings/general/store'
7+
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
68
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
79
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
810

11+
const logger = createLogger('SubBlockValue')
12+
913
// Helper function to dispatch collaborative subblock updates
1014
const dispatchSubblockUpdate = (blockId: string, subBlockId: string, value: any) => {
1115
const event = new CustomEvent('update-subblock-value', {
@@ -154,20 +158,52 @@ function storeApiKeyValue(
154158
}
155159
}
156160

161+
interface UseSubBlockValueOptions {
162+
debounceMs?: number
163+
streamingThresholdMs?: number
164+
onStreamingStart?: () => void
165+
onStreamingEnd?: () => void
166+
}
167+
157168
/**
158169
* Custom hook to get and set values for a sub-block in a workflow.
159170
* Handles complex object values properly by using deep equality comparison.
171+
* Includes automatic debouncing and streaming detection for large text operations.
160172
*
161173
* @param blockId The ID of the block containing the sub-block
162174
* @param subBlockId The ID of the sub-block
163175
* @param triggerWorkflowUpdate Whether to trigger a workflow update when the value changes
164-
* @returns A tuple containing the current value and a setter function
176+
* @param options Configuration for debouncing and streaming behavior
177+
* @returns A tuple containing the current value, setter function, and optionally streaming state
165178
*/
166179
export function useSubBlockValue<T = any>(
167180
blockId: string,
168181
subBlockId: string,
169-
triggerWorkflowUpdate = false
170-
): readonly [T | null, (value: T) => void] {
182+
triggerWorkflowUpdate?: boolean
183+
): readonly [T | null, (value: T) => void]
184+
185+
export function useSubBlockValue<T = any>(
186+
blockId: string,
187+
subBlockId: string,
188+
triggerWorkflowUpdate: boolean,
189+
options: UseSubBlockValueOptions
190+
): readonly [T | null, (value: T) => void, { isStreaming: boolean }]
191+
192+
export function useSubBlockValue<T = any>(
193+
blockId: string,
194+
subBlockId: string,
195+
triggerWorkflowUpdate = false,
196+
options?: UseSubBlockValueOptions
197+
):
198+
| readonly [T | null, (value: T) => void]
199+
| readonly [T | null, (value: T) => void, { isStreaming: boolean }] {
200+
const {
201+
debounceMs = 100,
202+
streamingThresholdMs = 50,
203+
onStreamingStart,
204+
onStreamingEnd,
205+
} = options || {}
206+
171207
const { collaborativeSetSubblockValue } = useCollaborativeWorkflow()
172208

173209
const blockType = useWorkflowStore(
@@ -187,6 +223,14 @@ export function useSubBlockValue<T = any>(
187223
// Previous model reference for detecting model changes
188224
const prevModelRef = useRef<string | null>(null)
189225

226+
// Debouncing and streaming detection refs
227+
const debounceTimerRef = useRef<NodeJS.Timeout | null>(null)
228+
const lastUpdateTimeRef = useRef<number>(0)
229+
const updateCountRef = useRef<number>(0)
230+
const streamingTimeoutRef = useRef<NodeJS.Timeout | null>(null)
231+
const lastEmittedValueRef = useRef<T | null>(null)
232+
const [isStreaming, setIsStreaming] = useState(false)
233+
190234
// Get value from subblock store - always call this hook unconditionally
191235
const storeValue = useSubBlockStore(
192236
useCallback((state) => state.getValue(blockId, subBlockId), [blockId, subBlockId])
@@ -211,13 +255,104 @@ export function useSubBlockValue<T = any>(
211255
// Compute the modelValue based on block type
212256
const modelValue = isProviderBasedBlock ? (modelSubBlockValue as string) : null
213257

258+
// Cleanup timers on unmount
259+
useEffect(() => {
260+
return () => {
261+
if (debounceTimerRef.current) {
262+
clearTimeout(debounceTimerRef.current)
263+
}
264+
if (streamingTimeoutRef.current) {
265+
clearTimeout(streamingTimeoutRef.current)
266+
}
267+
}
268+
}, [])
269+
270+
// Emit the value to socket/DB
271+
const emitValue = useCallback(
272+
(value: T) => {
273+
collaborativeSetSubblockValue(blockId, subBlockId, value)
274+
lastEmittedValueRef.current = value
275+
},
276+
[blockId, subBlockId, collaborativeSetSubblockValue]
277+
)
278+
279+
// Detect and handle streaming
280+
const detectStreaming = useCallback(() => {
281+
const now = Date.now()
282+
const timeSinceLastUpdate = now - lastUpdateTimeRef.current
283+
284+
// If updates are coming in rapidly, we're likely streaming
285+
if (timeSinceLastUpdate < streamingThresholdMs) {
286+
updateCountRef.current++
287+
288+
// Start streaming mode after 3 rapid updates
289+
if (updateCountRef.current >= 3 && !isStreaming) {
290+
logger.debug('Streaming detected', {
291+
blockId,
292+
subBlockId,
293+
updateCount: updateCountRef.current,
294+
})
295+
setIsStreaming(true)
296+
onStreamingStart?.()
297+
}
298+
} else {
299+
// Reset counter if updates slow down
300+
updateCountRef.current = 1
301+
}
302+
303+
lastUpdateTimeRef.current = now
304+
305+
// Set up timeout to end streaming
306+
if (streamingTimeoutRef.current) {
307+
clearTimeout(streamingTimeoutRef.current)
308+
}
309+
310+
if (isStreaming) {
311+
streamingTimeoutRef.current = setTimeout(() => {
312+
logger.debug('Ending streaming mode', { blockId, subBlockId })
313+
setIsStreaming(false)
314+
updateCountRef.current = 0
315+
onStreamingEnd?.()
316+
317+
// Emit the final value when streaming ends
318+
if (valueRef.current !== null && valueRef.current !== lastEmittedValueRef.current) {
319+
emitValue(valueRef.current)
320+
}
321+
}, 300) // End streaming 300ms after last update
322+
}
323+
}, [
324+
blockId,
325+
subBlockId,
326+
isStreaming,
327+
streamingThresholdMs,
328+
onStreamingStart,
329+
onStreamingEnd,
330+
emitValue,
331+
])
332+
214333
// Hook to set a value in the subblock store
215334
const setValue = useCallback(
216335
(newValue: T) => {
217336
// Use deep comparison to avoid unnecessary updates for complex objects
218337
if (!isEqual(valueRef.current, newValue)) {
219338
valueRef.current = newValue
220339

340+
// Always update local store immediately for UI responsiveness
341+
useSubBlockStore.setState((state) => ({
342+
workflowValues: {
343+
...state.workflowValues,
344+
[useWorkflowRegistry.getState().activeWorkflowId || '']: {
345+
...state.workflowValues[useWorkflowRegistry.getState().activeWorkflowId || ''],
346+
[blockId]: {
347+
...state.workflowValues[useWorkflowRegistry.getState().activeWorkflowId || '']?.[
348+
blockId
349+
],
350+
[subBlockId]: newValue,
351+
},
352+
},
353+
},
354+
}))
355+
221356
// Ensure we're passing the actual value, not a reference that might change
222357
const valueCopy =
223358
newValue === null
@@ -231,8 +366,27 @@ export function useSubBlockValue<T = any>(
231366
storeApiKeyValue(blockId, blockType, modelValue, newValue, storeValue)
232367
}
233368

234-
// Use collaborative function which handles both local store update and socket emission
235-
collaborativeSetSubblockValue(blockId, subBlockId, valueCopy)
369+
// Detect if we're in a streaming scenario
370+
detectStreaming()
371+
372+
// Clear any existing debounce timer
373+
if (debounceTimerRef.current) {
374+
clearTimeout(debounceTimerRef.current)
375+
}
376+
377+
// If streaming, don't emit immediately - wait for streaming to end
378+
if (!isStreaming) {
379+
// Detect large changes for automatic bulk operation mode
380+
const isLargeChange = detectLargeChange(lastEmittedValueRef.current, valueCopy)
381+
const effectiveDebounceMs = isLargeChange ? debounceMs * 2 : debounceMs
382+
383+
// Debounce the socket emission
384+
debounceTimerRef.current = setTimeout(() => {
385+
if (valueRef.current !== null && valueRef.current !== lastEmittedValueRef.current) {
386+
emitValue(valueCopy)
387+
}
388+
}, effectiveDebounceMs)
389+
}
236390

237391
if (triggerWorkflowUpdate) {
238392
useWorkflowStore.getState().triggerUpdate()
@@ -247,7 +401,10 @@ export function useSubBlockValue<T = any>(
247401
storeValue,
248402
triggerWorkflowUpdate,
249403
modelValue,
250-
collaborativeSetSubblockValue,
404+
isStreaming,
405+
debounceMs,
406+
detectStreaming,
407+
emitValue,
251408
]
252409
)
253410

@@ -320,5 +477,36 @@ export function useSubBlockValue<T = any>(
320477
}
321478
}, [storeValue, initialValue])
322479

480+
// Return appropriate tuple based on whether options were provided
481+
if (options) {
482+
return [
483+
storeValue !== undefined ? storeValue : initialValue,
484+
setValue,
485+
{ isStreaming },
486+
] as const
487+
}
323488
return [storeValue !== undefined ? storeValue : initialValue, setValue] as const
324489
}
490+
491+
// Helper function to detect large changes
492+
function detectLargeChange(oldValue: any, newValue: any): boolean {
493+
// Handle null/undefined
494+
if (oldValue == null && newValue == null) return false
495+
if (oldValue == null || newValue == null) return true
496+
497+
// For strings, check if it's a large paste or deletion
498+
if (typeof oldValue === 'string' && typeof newValue === 'string') {
499+
const sizeDiff = Math.abs(newValue.length - oldValue.length)
500+
// Consider it a large change if more than 50 characters changed at once
501+
return sizeDiff > 50
502+
}
503+
504+
// For arrays, check length difference
505+
if (Array.isArray(oldValue) && Array.isArray(newValue)) {
506+
const sizeDiff = Math.abs(newValue.length - oldValue.length)
507+
return sizeDiff > 5
508+
}
509+
510+
// For other types, always treat as small change
511+
return false
512+
}

apps/sim/socket-server/handlers/connection.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,5 @@ export function setupConnectionHandlers(
2828
roomManager.cleanupUserFromRoom(socket.id, workflowId)
2929
roomManager.broadcastPresenceUpdate(workflowId)
3030
}
31-
32-
roomManager.clearPendingOperations(socket.id)
3331
})
3432
}

apps/sim/socket-server/rooms/manager.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,6 @@ export class RoomManager {
7575
this.userSessions.delete(socketId)
7676
}
7777

78-
// This would be used if we implement operation queuing
79-
clearPendingOperations(socketId: string) {
80-
logger.debug(`Cleared pending operations for socket ${socketId}`)
81-
}
82-
8378
handleWorkflowDeletion(workflowId: string) {
8479
logger.info(`Handling workflow deletion notification for ${workflowId}`)
8580

0 commit comments

Comments
 (0)