Skip to content

Commit 9097c52

Browse files
authored
fix(sockets): added debouncing for sub-block values to prevent overloading socket server, fixed persistence issue during streaming back from LLM response format, removed unused events (#642)
* fix(sockets): added debouncing for sub-block values to prevent overloading socket server, fixed persistence issue during streaming back from LLM response format, removed unused events * reuse existing isStreaming state for code block llm-generated response format
1 parent 2ce68ae commit 9097c52

File tree

5 files changed

+164
-41
lines changed

5 files changed

+164
-41
lines changed

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/components/code.tsx

Lines changed: 42 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,6 @@ export function Code({
7373
}
7474
}, [generationType])
7575

76-
// State management
77-
const [storeValue, setStoreValue] = useSubBlockValue(blockId, subBlockId)
7876
const [code, setCode] = useState<string>('')
7977
const [_lineCount, setLineCount] = useState(1)
8078
const [showTags, setShowTags] = useState(false)
@@ -98,53 +96,69 @@ export function Code({
9896
const toggleCollapsed = () => {
9997
setCollapsedValue(blockId, collapsedStateKey, !isCollapsed)
10098
}
99+
100+
// Create refs to hold the handlers
101+
const handleStreamStartRef = useRef<() => void>(() => {})
102+
const handleGeneratedContentRef = useRef<(generatedCode: string) => void>(() => {})
103+
const handleStreamChunkRef = useRef<(chunk: string) => void>(() => {})
104+
105+
// AI Code Generation Hook
106+
const {
107+
isLoading: isAiLoading,
108+
isStreaming: isAiStreaming,
109+
generate: generateCode,
110+
generateStream: generateCodeStream,
111+
cancelGeneration,
112+
isPromptVisible,
113+
showPromptInline,
114+
hidePromptInline,
115+
promptInputValue,
116+
updatePromptValue,
117+
} = useCodeGeneration({
118+
generationType: generationType,
119+
initialContext: code,
120+
onGeneratedContent: (content: string) => handleGeneratedContentRef.current?.(content),
121+
onStreamChunk: (chunk: string) => handleStreamChunkRef.current?.(chunk),
122+
onStreamStart: () => handleStreamStartRef.current?.(),
123+
})
124+
125+
// State management - useSubBlockValue with explicit streaming control
126+
const [storeValue, setStoreValue] = useSubBlockValue(blockId, subBlockId, false, {
127+
debounceMs: 150,
128+
isStreaming: isAiStreaming, // Use AI streaming state directly
129+
onStreamingEnd: () => {
130+
logger.debug('AI streaming ended, value persisted', { blockId, subBlockId })
131+
},
132+
})
133+
101134
// Use preview value when in preview mode, otherwise use store value or prop value
102135
const value = isPreview ? previewValue : propValue !== undefined ? propValue : storeValue
103136

104-
// AI Code Generation Hook
105-
const handleStreamStart = () => {
137+
// Define the handlers now that we have access to setStoreValue
138+
handleStreamStartRef.current = () => {
106139
setCode('')
107-
// Optionally clear the store value too, though handleStreamChunk will update it
108-
// setStoreValue('')
140+
// Streaming state is now controlled by isAiStreaming
109141
}
110142

111-
const handleGeneratedContent = (generatedCode: string) => {
143+
handleGeneratedContentRef.current = (generatedCode: string) => {
112144
setCode(generatedCode)
113145
if (!isPreview && !disabled) {
114146
setStoreValue(generatedCode)
147+
// Final value will be persisted when isAiStreaming becomes false
115148
}
116149
}
117150

118-
// Handle streaming chunks directly into the editor
119-
const handleStreamChunk = (chunk: string) => {
151+
handleStreamChunkRef.current = (chunk: string) => {
120152
setCode((currentCode) => {
121153
const newCode = currentCode + chunk
122154
if (!isPreview && !disabled) {
155+
// Update the value - it won't be persisted until streaming ends
123156
setStoreValue(newCode)
124157
}
125158
return newCode
126159
})
127160
}
128161

129-
const {
130-
isLoading: isAiLoading,
131-
isStreaming: isAiStreaming,
132-
generate: generateCode,
133-
generateStream: generateCodeStream,
134-
cancelGeneration,
135-
isPromptVisible,
136-
showPromptInline,
137-
hidePromptInline,
138-
promptInputValue,
139-
updatePromptValue,
140-
} = useCodeGeneration({
141-
generationType: generationType,
142-
initialContext: code,
143-
onGeneratedContent: handleGeneratedContent,
144-
onStreamChunk: handleStreamChunk,
145-
onStreamStart: handleStreamStart,
146-
})
147-
148162
// Effects
149163
useEffect(() => {
150164
const valueString = value?.toString() ?? ''

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/components/response/response-format.tsx

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,11 @@ export function ResponseFormat({
5050
isPreview = false,
5151
previewValue,
5252
}: ResponseFormatProps) {
53-
const [storeValue, setStoreValue] = useSubBlockValue<JSONProperty[]>(blockId, subBlockId)
53+
// useSubBlockValue now includes debouncing by default
54+
const [storeValue, setStoreValue] = useSubBlockValue<JSONProperty[]>(blockId, subBlockId, false, {
55+
debounceMs: 200, // Slightly longer debounce for complex structures
56+
})
57+
5458
const [showPreview, setShowPreview] = useState(false)
5559

5660
const value = isPreview ? previewValue : storeValue

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/hooks/use-sub-block-value.ts

Lines changed: 117 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
import { useCallback, useEffect, useRef } from 'react'
22
import { isEqual } from 'lodash'
3+
import { createLogger } from '@/lib/logs/console-logger'
34
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
45
import { getProviderFromModel } from '@/providers/utils'
56
import { useGeneralStore } from '@/stores/settings/general/store'
7+
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
68
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
79
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
810

11+
const logger = createLogger('SubBlockValue')
12+
913
// Helper function to dispatch collaborative subblock updates
1014
const dispatchSubblockUpdate = (blockId: string, subBlockId: string, value: any) => {
1115
const event = new CustomEvent('update-subblock-value', {
@@ -154,20 +158,31 @@ function storeApiKeyValue(
154158
}
155159
}
156160

161+
interface UseSubBlockValueOptions {
162+
debounceMs?: number
163+
isStreaming?: boolean // Explicit streaming state
164+
onStreamingEnd?: () => void
165+
}
166+
157167
/**
158168
* Custom hook to get and set values for a sub-block in a workflow.
159169
* Handles complex object values properly by using deep equality comparison.
170+
* Includes automatic debouncing and explicit streaming mode for AI generation.
160171
*
161172
* @param blockId The ID of the block containing the sub-block
162173
* @param subBlockId The ID of the sub-block
163174
* @param triggerWorkflowUpdate Whether to trigger a workflow update when the value changes
164-
* @returns A tuple containing the current value and a setter function
175+
* @param options Configuration for debouncing and streaming behavior
176+
* @returns A tuple containing the current value and setter function
165177
*/
166178
export function useSubBlockValue<T = any>(
167179
blockId: string,
168180
subBlockId: string,
169-
triggerWorkflowUpdate = false
181+
triggerWorkflowUpdate = false,
182+
options?: UseSubBlockValueOptions
170183
): readonly [T | null, (value: T) => void] {
184+
const { debounceMs = 150, isStreaming = false, onStreamingEnd } = options || {}
185+
171186
const { collaborativeSetSubblockValue } = useCollaborativeWorkflow()
172187

173188
const blockType = useWorkflowStore(
@@ -187,6 +202,12 @@ export function useSubBlockValue<T = any>(
187202
// Previous model reference for detecting model changes
188203
const prevModelRef = useRef<string | null>(null)
189204

205+
// Debouncing refs
206+
const debounceTimerRef = useRef<NodeJS.Timeout | null>(null)
207+
const lastEmittedValueRef = useRef<T | null>(null)
208+
const streamingValueRef = useRef<T | null>(null)
209+
const wasStreamingRef = useRef<boolean>(false)
210+
190211
// Get value from subblock store - always call this hook unconditionally
191212
const storeValue = useSubBlockStore(
192213
useCallback((state) => state.getValue(blockId, subBlockId), [blockId, subBlockId])
@@ -211,13 +232,59 @@ export function useSubBlockValue<T = any>(
211232
// Compute the modelValue based on block type
212233
const modelValue = isProviderBasedBlock ? (modelSubBlockValue as string) : null
213234

235+
// Cleanup timer on unmount
236+
useEffect(() => {
237+
return () => {
238+
if (debounceTimerRef.current) {
239+
clearTimeout(debounceTimerRef.current)
240+
}
241+
}
242+
}, [])
243+
244+
// Emit the value to socket/DB
245+
const emitValue = useCallback(
246+
(value: T) => {
247+
collaborativeSetSubblockValue(blockId, subBlockId, value)
248+
lastEmittedValueRef.current = value
249+
},
250+
[blockId, subBlockId, collaborativeSetSubblockValue]
251+
)
252+
253+
// Handle streaming mode changes
254+
useEffect(() => {
255+
// If we just exited streaming mode, emit the final value
256+
if (wasStreamingRef.current && !isStreaming && streamingValueRef.current !== null) {
257+
logger.debug('Streaming ended, persisting final value', { blockId, subBlockId })
258+
emitValue(streamingValueRef.current)
259+
streamingValueRef.current = null
260+
onStreamingEnd?.()
261+
}
262+
wasStreamingRef.current = isStreaming
263+
}, [isStreaming, blockId, subBlockId, emitValue, onStreamingEnd])
264+
214265
// Hook to set a value in the subblock store
215266
const setValue = useCallback(
216267
(newValue: T) => {
217268
// Use deep comparison to avoid unnecessary updates for complex objects
218269
if (!isEqual(valueRef.current, newValue)) {
219270
valueRef.current = newValue
220271

272+
// Always update local store immediately for UI responsiveness
273+
useSubBlockStore.setState((state) => ({
274+
workflowValues: {
275+
...state.workflowValues,
276+
[useWorkflowRegistry.getState().activeWorkflowId || '']: {
277+
...state.workflowValues[useWorkflowRegistry.getState().activeWorkflowId || ''],
278+
[blockId]: {
279+
...state.workflowValues[useWorkflowRegistry.getState().activeWorkflowId || '']?.[
280+
blockId
281+
],
282+
[subBlockId]: newValue,
283+
},
284+
},
285+
},
286+
}))
287+
221288
// Ensure we're passing the actual value, not a reference that might change
222289
const valueCopy =
223290
newValue === null
@@ -231,8 +298,27 @@ export function useSubBlockValue<T = any>(
231298
storeApiKeyValue(blockId, blockType, modelValue, newValue, storeValue)
232299
}
233300

234-
// Use collaborative function which handles both local store update and socket emission
235-
collaborativeSetSubblockValue(blockId, subBlockId, valueCopy)
301+
// Clear any existing debounce timer
302+
if (debounceTimerRef.current) {
303+
clearTimeout(debounceTimerRef.current)
304+
debounceTimerRef.current = null
305+
}
306+
307+
// If streaming, just store the value without emitting
308+
if (isStreaming) {
309+
streamingValueRef.current = valueCopy
310+
} else {
311+
// Detect large changes for extended debounce
312+
const isLargeChange = detectLargeChange(lastEmittedValueRef.current, valueCopy)
313+
const effectiveDebounceMs = isLargeChange ? debounceMs * 2 : debounceMs
314+
315+
// Debounce the socket emission
316+
debounceTimerRef.current = setTimeout(() => {
317+
if (valueRef.current !== null && valueRef.current !== lastEmittedValueRef.current) {
318+
emitValue(valueCopy)
319+
}
320+
}, effectiveDebounceMs)
321+
}
236322

237323
if (triggerWorkflowUpdate) {
238324
useWorkflowStore.getState().triggerUpdate()
@@ -247,7 +333,9 @@ export function useSubBlockValue<T = any>(
247333
storeValue,
248334
triggerWorkflowUpdate,
249335
modelValue,
250-
collaborativeSetSubblockValue,
336+
isStreaming,
337+
debounceMs,
338+
emitValue,
251339
]
252340
)
253341

@@ -320,5 +408,29 @@ export function useSubBlockValue<T = any>(
320408
}
321409
}, [storeValue, initialValue])
322410

411+
// Return appropriate tuple based on whether options were provided
323412
return [storeValue !== undefined ? storeValue : initialValue, setValue] as const
324413
}
414+
415+
// Helper function to detect large changes
416+
function detectLargeChange(oldValue: any, newValue: any): boolean {
417+
// Handle null/undefined
418+
if (oldValue == null && newValue == null) return false
419+
if (oldValue == null || newValue == null) return true
420+
421+
// For strings, check if it's a large paste or deletion
422+
if (typeof oldValue === 'string' && typeof newValue === 'string') {
423+
const sizeDiff = Math.abs(newValue.length - oldValue.length)
424+
// Consider it a large change if more than 50 characters changed at once
425+
return sizeDiff > 50
426+
}
427+
428+
// For arrays, check length difference
429+
if (Array.isArray(oldValue) && Array.isArray(newValue)) {
430+
const sizeDiff = Math.abs(newValue.length - oldValue.length)
431+
return sizeDiff > 5
432+
}
433+
434+
// For other types, always treat as small change
435+
return false
436+
}

apps/sim/socket-server/handlers/connection.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,5 @@ export function setupConnectionHandlers(
2828
roomManager.cleanupUserFromRoom(socket.id, workflowId)
2929
roomManager.broadcastPresenceUpdate(workflowId)
3030
}
31-
32-
roomManager.clearPendingOperations(socket.id)
3331
})
3432
}

apps/sim/socket-server/rooms/manager.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,6 @@ export class RoomManager {
7575
this.userSessions.delete(socketId)
7676
}
7777

78-
// This would be used if we implement operation queuing
79-
clearPendingOperations(socketId: string) {
80-
logger.debug(`Cleared pending operations for socket ${socketId}`)
81-
}
82-
8378
handleWorkflowDeletion(workflowId: string) {
8479
logger.info(`Handling workflow deletion notification for ${workflowId}`)
8580

0 commit comments

Comments
 (0)