Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import { useSubBlockValue } from '@/app/workspace/[workspaceId]/w/[workflowId]/c
import { useWand } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-wand'
import type { SubBlockConfig } from '@/blocks/types'
import { useTagSelection } from '@/hooks/use-tag-selection'
import { useOperationQueueStore } from '@/stores/operation-queue/store'

const logger = createLogger('LongInput')

Expand Down Expand Up @@ -382,11 +381,6 @@ export function LongInput({
onScroll={handleScroll}
onWheel={handleWheel}
onKeyDown={handleKeyDown}
onBlur={() => {
try {
useOperationQueueStore.getState().flushDebouncedForBlock(blockId)
} catch {}
}}
onFocus={() => {
setShowEnvVars(false)
setShowTags(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import { useSubBlockValue } from '@/app/workspace/[workspaceId]/w/[workflowId]/c
import { useWand } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-wand'
import type { SubBlockConfig } from '@/blocks/types'
import { useTagSelection } from '@/hooks/use-tag-selection'
import { useOperationQueueStore } from '@/stores/operation-queue/store'

const logger = createLogger('ShortInput')

Expand Down Expand Up @@ -396,9 +395,6 @@ export function ShortInput({
onBlur={() => {
setIsFocused(false)
setShowEnvVars(false)
try {
useOperationQueueStore.getState().flushDebouncedForBlock(blockId)
} catch {}
}}
onDrop={handleDrop}
onDragOver={handleDragOver}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ interface UseSubBlockValueOptions {
/**
* Custom hook to get and set values for a sub-block in a workflow.
* Handles complex object values properly by using deep equality comparison.
* Includes automatic debouncing and explicit streaming mode for AI generation.
* Supports explicit streaming mode for AI generation.
*
* @param blockId The ID of the block containing the sub-block
* @param subBlockId The ID of the sub-block
Expand Down Expand Up @@ -181,7 +181,7 @@ export function useSubBlockValue<T = any>(
}
}

// Emit immediately - let the operation queue handle debouncing and deduplication
// Emit immediately; the client queue coalesces same-key ops and the server debounces
emitValue(valueCopy)

if (triggerWorkflowUpdate) {
Expand Down
11 changes: 0 additions & 11 deletions apps/sim/contexts/socket-context.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -514,16 +514,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
`URL workflow changed from ${currentWorkflowId} to ${urlWorkflowId}, switching rooms`
)

try {
const { useOperationQueueStore } = require('@/stores/operation-queue/store')
// Flush debounced updates for the old workflow before switching rooms
if (currentWorkflowId) {
useOperationQueueStore.getState().flushDebouncedForWorkflow(currentWorkflowId)
} else {
useOperationQueueStore.getState().flushAllDebounced()
}
} catch {}

// Leave current workflow first if we're in one
if (currentWorkflowId) {
logger.info(`Leaving current workflow ${currentWorkflowId} before joining ${urlWorkflowId}`)
Expand Down Expand Up @@ -583,7 +573,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
logger.info(`Leaving workflow: ${currentWorkflowId}`)
try {
const { useOperationQueueStore } = require('@/stores/operation-queue/store')
useOperationQueueStore.getState().flushDebouncedForWorkflow(currentWorkflowId)
useOperationQueueStore.getState().cancelOperationsForWorkflow(currentWorkflowId)
} catch {}
socket.emit('leave-workflow')
Expand Down
240 changes: 152 additions & 88 deletions apps/sim/socket-server/handlers/subblocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ import type { RoomManager } from '@/socket-server/rooms/manager'

const logger = createLogger('SubblocksHandlers')

type PendingSubblock = {
latest: { blockId: string; subblockId: string; value: any; timestamp: number }
timeout: NodeJS.Timeout
// Map operationId -> socketId to emit confirmations/failures to correct clients
opToSocket: Map<string, string>
}

// Keyed by `${workflowId}:${blockId}:${subblockId}`
const pendingSubblockUpdates = new Map<string, PendingSubblock>()

export function setupSubblocksHandlers(
socket: AuthenticatedSocket,
deps: HandlerDependencies | RoomManager
Expand Down Expand Up @@ -46,106 +56,44 @@ export function setupSubblocksHandlers(
userPresence.lastActivity = Date.now()
}

// First, verify that the workflow still exists in the database
const workflowExists = await db
.select({ id: workflow.id })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)

if (workflowExists.length === 0) {
logger.warn(`Ignoring subblock update: workflow ${workflowId} no longer exists`, {
socketId: socket.id,
blockId,
subblockId,
})
roomManager.cleanupUserFromRoom(socket.id, workflowId)
return
}

let updateSuccessful = false
await db.transaction(async (tx) => {
const [block] = await tx
.select({ subBlocks: workflowBlocks.subBlocks })
.from(workflowBlocks)
.where(and(eq(workflowBlocks.id, blockId), eq(workflowBlocks.workflowId, workflowId)))
.limit(1)

if (!block) {
// Block was deleted - this is a normal race condition in collaborative editing
logger.debug(
`Ignoring subblock update for deleted block: ${workflowId}/${blockId}.${subblockId}`
)
return
}

const subBlocks = (block.subBlocks as any) || {}

if (!subBlocks[subblockId]) {
// Create new subblock with minimal structure
subBlocks[subblockId] = {
id: subblockId,
type: 'unknown', // Will be corrected by next collaborative update
value: value,
}
} else {
// Preserve existing id and type, only update value
subBlocks[subblockId] = {
...subBlocks[subblockId],
value: value,
// Server-side debounce/coalesce by workflowId+blockId+subblockId
const debouncedKey = `${workflowId}:${blockId}:${subblockId}`
const existing = pendingSubblockUpdates.get(debouncedKey)
if (existing) {
clearTimeout(existing.timeout)
existing.latest = { blockId, subblockId, value, timestamp }
if (operationId) existing.opToSocket.set(operationId, socket.id)
existing.timeout = setTimeout(async () => {
await flushSubblockUpdate(workflowId, existing, roomManager)
pendingSubblockUpdates.delete(debouncedKey)
}, 25)
} else {
const opToSocket = new Map<string, string>()
if (operationId) opToSocket.set(operationId, socket.id)
const timeout = setTimeout(async () => {
const pending = pendingSubblockUpdates.get(debouncedKey)
if (pending) {
await flushSubblockUpdate(workflowId, pending, roomManager)
pendingSubblockUpdates.delete(debouncedKey)
}
}

await tx
.update(workflowBlocks)
.set({
subBlocks: subBlocks,
updatedAt: new Date(),
})
.where(and(eq(workflowBlocks.id, blockId), eq(workflowBlocks.workflowId, workflowId)))

updateSuccessful = true
})

// Only broadcast to other clients if the update was successful
if (updateSuccessful) {
socket.to(workflowId).emit('subblock-update', {
blockId,
subblockId,
value,
timestamp,
senderId: socket.id,
userId: session.userId,
})

// Emit confirmation if operationId is provided
if (operationId) {
socket.emit('operation-confirmed', {
operationId,
serverTimestamp: Date.now(),
})
}

logger.debug(`Subblock update in workflow ${workflowId}: ${blockId}.${subblockId}`)
} else if (operationId) {
// Block was deleted - notify client that operation completed (but didn't update anything)
socket.emit('operation-failed', {
operationId,
error: 'Block no longer exists',
retryable: false, // No point retrying for deleted blocks
}, 25)
pendingSubblockUpdates.set(debouncedKey, {
latest: { blockId, subblockId, value, timestamp },
timeout,
opToSocket,
})
}
} catch (error) {
logger.error('Error handling subblock update:', error)

const errorMessage = error instanceof Error ? error.message : 'Unknown error'

// Emit operation-failed for queue-tracked operations
// Best-effort failure for the single operation if provided
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: errorMessage,
retryable: true, // Subblock updates are generally retryable
retryable: true,
})
}

Expand All @@ -159,3 +107,119 @@ export function setupSubblocksHandlers(
}
})
}

async function flushSubblockUpdate(
workflowId: string,
pending: PendingSubblock,
roomManager: RoomManager
) {
const { blockId, subblockId, value, timestamp } = pending.latest
try {
// Verify workflow still exists
const workflowExists = await db
.select({ id: workflow.id })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)

if (workflowExists.length === 0) {
pending.opToSocket.forEach((socketId, opId) => {
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: 'Workflow not found',
retryable: false,
})
}
})
return
}

let updateSuccessful = false
await db.transaction(async (tx) => {
const [block] = await tx
.select({ subBlocks: workflowBlocks.subBlocks })
.from(workflowBlocks)
.where(and(eq(workflowBlocks.id, blockId), eq(workflowBlocks.workflowId, workflowId)))
.limit(1)

if (!block) {
return
}

const subBlocks = (block.subBlocks as any) || {}
if (!subBlocks[subblockId]) {
subBlocks[subblockId] = { id: subblockId, type: 'unknown', value }
} else {
subBlocks[subblockId] = { ...subBlocks[subblockId], value }
}

await tx
.update(workflowBlocks)
.set({ subBlocks, updatedAt: new Date() })
.where(and(eq(workflowBlocks.id, blockId), eq(workflowBlocks.workflowId, workflowId)))

updateSuccessful = true
})

if (updateSuccessful) {
// Broadcast to other clients (exclude senders to avoid overwriting their local state)
const senderSocketIds = new Set(pending.opToSocket.values())
const io = (roomManager as any).io
if (io) {
// Get all sockets in the room
const roomSockets = io.sockets.adapter.rooms.get(workflowId)
if (roomSockets) {
roomSockets.forEach((socketId: string) => {
// Only emit to sockets that didn't send any of the coalesced ops
if (!senderSocketIds.has(socketId)) {
const sock = io.sockets.sockets.get(socketId)
if (sock) {
sock.emit('subblock-update', {
blockId,
subblockId,
value,
timestamp,
})
}
}
})
}
}

// Confirm all coalesced operationIds
pending.opToSocket.forEach((socketId, opId) => {
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-confirmed', { operationId: opId, serverTimestamp: Date.now() })
}
})

logger.debug(`Flushed subblock update ${workflowId}: ${blockId}.${subblockId}`)
} else {
pending.opToSocket.forEach((socketId, opId) => {
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: 'Block no longer exists',
retryable: false,
})
}
})
}
} catch (error) {
logger.error('Error flushing subblock update:', error)
pending.opToSocket.forEach((socketId, opId) => {
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: error instanceof Error ? error.message : 'Unknown error',
retryable: true,
})
}
})
}
}
Loading