Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ export function useSubBlockValue<T = any>(
return
}

const currentActiveWorkflowId = useWorkflowRegistry.getState().activeWorkflowId
if (!currentActiveWorkflowId) {
logger.warn('No active workflow ID when setting value', { blockId, subBlockId })
return
}

// Use deep comparison to avoid unnecessary updates for complex objects
if (!isEqual(valueRef.current, newValue)) {
valueRef.current = newValue
Expand All @@ -147,10 +153,10 @@ export function useSubBlockValue<T = any>(
useSubBlockStore.setState((state) => ({
workflowValues: {
...state.workflowValues,
[activeWorkflowId || '']: {
...state.workflowValues[activeWorkflowId || ''],
[currentActiveWorkflowId]: {
...state.workflowValues[currentActiveWorkflowId],
[blockId]: {
...state.workflowValues[activeWorkflowId || '']?.[blockId],
...state.workflowValues[currentActiveWorkflowId]?.[blockId],
[subBlockId]: newValue,
},
},
Expand Down Expand Up @@ -194,7 +200,6 @@ export function useSubBlockValue<T = any>(
isStreaming,
emitValue,
isShowingDiff,
activeWorkflowId,
]
)

Expand Down
229 changes: 86 additions & 143 deletions apps/sim/contexts/socket-context.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -327,91 +327,97 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
}
})

// Shared function to rehydrate workflow stores
const rehydrateWorkflowStores = async (
workflowId: string,
workflowState: any,
source: 'copilot' | 'workflow-state'
) => {
// Import stores dynamically
const [
{ useOperationQueueStore },
{ useWorkflowRegistry },
{ useWorkflowStore },
{ useSubBlockStore },
] = await Promise.all([
import('@/stores/operation-queue/store'),
import('@/stores/workflows/registry/store'),
import('@/stores/workflows/workflow/store'),
import('@/stores/workflows/subblock/store'),
])

// Only proceed if this is the active workflow
const { activeWorkflowId } = useWorkflowRegistry.getState()
if (activeWorkflowId !== workflowId) {
logger.info(`Skipping rehydration - workflow ${workflowId} is not active`)
return false
}

// Check for pending operations
const hasPending = useOperationQueueStore
.getState()
.operations.some((op: any) => op.workflowId === workflowId && op.status !== 'confirmed')
if (hasPending) {
logger.info(`Skipping ${source} rehydration due to pending operations in queue`)
return false
}

// Extract subblock values from blocks
const subblockValues: Record<string, Record<string, any>> = {}
Object.entries(workflowState.blocks || {}).forEach(([blockId, block]) => {
const blockState = block as any
subblockValues[blockId] = {}
Object.entries(blockState.subBlocks || {}).forEach(([subblockId, subblock]) => {
subblockValues[blockId][subblockId] = (subblock as any).value
})
})

// Replace local workflow store with authoritative server state
useWorkflowStore.setState({
blocks: workflowState.blocks || {},
edges: workflowState.edges || [],
loops: workflowState.loops || {},
parallels: workflowState.parallels || {},
lastSaved: workflowState.lastSaved || Date.now(),
isDeployed: workflowState.isDeployed ?? false,
deployedAt: workflowState.deployedAt,
deploymentStatuses: workflowState.deploymentStatuses || {},
hasActiveWebhook: workflowState.hasActiveWebhook ?? false,
})

// Replace subblock store values for this workflow
useSubBlockStore.setState((state: any) => ({
workflowValues: {
...state.workflowValues,
[workflowId]: subblockValues,
},
}))

logger.info(`Successfully rehydrated stores from ${source}`)
return true
}

// Copilot workflow edit events (database has been updated, rehydrate stores)
socketInstance.on('copilot-workflow-edit', async (data) => {
logger.info(
`Copilot edited workflow ${data.workflowId} - rehydrating stores from database`
)

if (data.workflowId === urlWorkflowId) {
try {
const { useOperationQueueStore } = require('@/stores/operation-queue/store')
const hasPending = useOperationQueueStore
.getState()
.operations.some(
(op: any) => op.workflowId === data.workflowId && op.status !== 'confirmed'
)
if (hasPending) {
logger.info('Skipping copilot rehydration due to pending operations in queue')
return
}
} catch {}
try {
// Fetch fresh workflow state directly from API
const response = await fetch(`/api/workflows/${data.workflowId}`)
if (response.ok) {
const responseData = await response.json()
const workflowData = responseData.data

if (workflowData?.state) {
logger.info('Rehydrating stores with fresh workflow state from database')

// Import stores dynamically to avoid import issues
Promise.all([
import('@/stores/workflows/workflow/store'),
import('@/stores/workflows/subblock/store'),
])
.then(([{ useWorkflowStore }, { useSubBlockStore }]) => {
const workflowState = workflowData.state

// Extract subblock values from blocks
const subblockValues: Record<string, Record<string, any>> = {}
Object.entries(workflowState.blocks || {}).forEach(([blockId, block]) => {
const blockState = block as any
subblockValues[blockId] = {}
Object.entries(blockState.subBlocks || {}).forEach(
([subblockId, subblock]) => {
subblockValues[blockId][subblockId] = (subblock as any).value
}
)
})

// Replace local workflow store with authoritative server state
useWorkflowStore.setState({
blocks: workflowState.blocks || {},
edges: workflowState.edges || [],
loops: workflowState.loops || {},
parallels: workflowState.parallels || {},
lastSaved: workflowState.lastSaved || Date.now(),
isDeployed: workflowState.isDeployed ?? false,
deployedAt: workflowState.deployedAt,
deploymentStatuses: workflowState.deploymentStatuses || {},
hasActiveWebhook: workflowState.hasActiveWebhook ?? false,
})

// Replace subblock store values for this workflow
useSubBlockStore.setState((state: any) => ({
workflowValues: {
...state.workflowValues,
[data.workflowId]: subblockValues,
},
}))

// Note: Auto-layout is already handled by the copilot backend before saving
// No need to trigger additional auto-layout here to avoid ID conflicts

logger.info('Successfully rehydrated stores from database after copilot edit')
})
.catch((error) => {
logger.error('Failed to import stores for copilot rehydration:', error)
})
}
} else {
logger.error('Failed to fetch fresh workflow state:', response.statusText)
try {
// Fetch fresh workflow state directly from API
const response = await fetch(`/api/workflows/${data.workflowId}`)
if (response.ok) {
const responseData = await response.json()
const workflowData = responseData.data

if (workflowData?.state) {
await rehydrateWorkflowStores(data.workflowId, workflowData.state, 'copilot')
}
} catch (error) {
logger.error('Failed to rehydrate stores after copilot edit:', error)
} else {
logger.error('Failed to fetch fresh workflow state:', response.statusText)
}
} catch (error) {
logger.error('Failed to rehydrate stores after copilot edit:', error)
}
})

Expand Down Expand Up @@ -465,74 +471,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
logger.debug('Operation confirmed:', data)
})

socketInstance.on('workflow-state', (workflowData) => {
socketInstance.on('workflow-state', async (workflowData) => {
logger.info('Received workflow state from server')

// Update local stores with the fresh workflow state (same logic as YAML editor)
if (workflowData?.state && workflowData.id === urlWorkflowId) {
try {
const { useOperationQueueStore } = require('@/stores/operation-queue/store')
const hasPending = useOperationQueueStore
.getState()
.operations.some(
(op: any) => op.workflowId === workflowData.id && op.status !== 'confirmed'
)
if (hasPending) {
logger.info(
'Skipping workflow-state rehydration due to pending operations in queue'
)
return
}
} catch {}
logger.info('Updating local stores with fresh workflow state from server')

try {
Promise.all([
import('@/stores/workflows/workflow/store'),
import('@/stores/workflows/subblock/store'),
import('@/stores/workflows/registry/store'),
])
.then(([{ useWorkflowStore }, { useSubBlockStore }]) => {
const workflowState = workflowData.state

const subblockValues: Record<string, Record<string, any>> = {}
Object.entries(workflowState.blocks || {}).forEach(([blockId, block]) => {
const blockState = block as any
subblockValues[blockId] = {}
Object.entries(blockState.subBlocks || {}).forEach(([subblockId, subblock]) => {
subblockValues[blockId][subblockId] = (subblock as any).value
})
})

// Replace local workflow store with authoritative server state
useWorkflowStore.setState({
blocks: workflowState.blocks || {},
edges: workflowState.edges || [],
loops: workflowState.loops || {},
parallels: workflowState.parallels || {},
lastSaved: workflowState.lastSaved || Date.now(),
isDeployed: workflowState.isDeployed ?? false,
deployedAt: workflowState.deployedAt,
deploymentStatuses: workflowState.deploymentStatuses || {},
hasActiveWebhook: workflowState.hasActiveWebhook ?? false,
})

// Replace subblock store values for this workflow
useSubBlockStore.setState((state: any) => ({
workflowValues: {
...state.workflowValues,
[workflowData.id]: subblockValues,
},
}))

logger.info('Merged fresh workflow state with local state')
})
.catch((error) => {
logger.error('Failed to import stores for workflow state update:', error)
})
} catch (error) {
logger.error('Failed to update local stores with workflow state:', error)
}
if (workflowData?.state) {
await rehydrateWorkflowStores(workflowData.id, workflowData.state, 'workflow-state')
}
})

Expand Down