Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions apps/sim/app/api/workflows/[id]/execute/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ describe('Workflow Execution API Route', () => {
}))

vi.doMock('@/lib/workflows/db-helpers', () => ({
loadWorkflowFromNormalizedTables: vi.fn().mockResolvedValue({
loadDeployedWorkflowState: vi.fn().mockResolvedValue({
blocks: {
'starter-id': {
id: 'starter-id',
Expand Down Expand Up @@ -121,7 +121,7 @@ describe('Workflow Execution API Route', () => {
],
loops: {},
parallels: {},
isFromNormalizedTables: true,
isFromNormalizedTables: false, // Changed to false since it's from deployed state
}),
}))

Expand Down Expand Up @@ -516,7 +516,7 @@ describe('Workflow Execution API Route', () => {
}))

vi.doMock('@/lib/workflows/db-helpers', () => ({
loadWorkflowFromNormalizedTables: vi.fn().mockResolvedValue({
loadDeployedWorkflowState: vi.fn().mockResolvedValue({
blocks: {
'starter-id': {
id: 'starter-id',
Expand Down Expand Up @@ -550,7 +550,7 @@ describe('Workflow Execution API Route', () => {
],
loops: {},
parallels: {},
isFromNormalizedTables: true,
isFromNormalizedTables: false, // Changed to false since it's from deployed state
}),
}))

Expand Down
21 changes: 7 additions & 14 deletions apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { decryptSecret } from '@/lib/utils'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers'
import { loadDeployedWorkflowState } from '@/lib/workflows/db-helpers'
import {
createHttpResponseFromBlock,
updateWorkflowRunCounts,
Expand Down Expand Up @@ -111,20 +111,13 @@ async function executeWorkflow(workflow: any, requestId: string, input?: any): P
runningExecutions.add(executionKey)
logger.info(`[${requestId}] Starting workflow execution: ${workflowId}`)

// Load workflow data from normalized tables
logger.debug(`[${requestId}] Loading workflow ${workflowId} from normalized tables`)
const normalizedData = await loadWorkflowFromNormalizedTables(workflowId)
// Load workflow data from deployed state for API executions
const deployedData = await loadDeployedWorkflowState(workflowId)

if (!normalizedData) {
throw new Error(
`Workflow ${workflowId} has no normalized data available. Ensure the workflow is properly saved to normalized tables.`
)
}

// Use normalized data as primary source
const { blocks, edges, loops, parallels } = normalizedData
logger.info(`[${requestId}] Using normalized tables for workflow execution: ${workflowId}`)
logger.debug(`[${requestId}] Normalized data loaded:`, {
// Use deployed data as primary source for API executions
const { blocks, edges, loops, parallels } = deployedData
logger.info(`[${requestId}] Using deployed state for workflow execution: ${workflowId}`)
logger.debug(`[${requestId}] Deployed data loaded:`, {
blocksCount: Object.keys(blocks || {}).length,
edgesCount: (edges || []).length,
loopsCount: Object.keys(loops || {}).length,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ export function WorkflowBlock({ id, data }: NodeProps<WorkflowBlockProps>) {
const currentWorkflow = useCurrentWorkflow()
const currentBlock = currentWorkflow.getBlockById(id)

const isEnabled = currentBlock?.enabled ?? true
// In preview mode, use the blockState provided; otherwise use current workflow state
const isEnabled = data.isPreview
? (data.blockState?.enabled ?? true)
: (currentBlock?.enabled ?? true)

// Get diff status from the block itself (set by diff engine)
const diffStatus =
Expand Down
51 changes: 46 additions & 5 deletions apps/sim/lib/workflows/db-helpers.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { eq } from 'drizzle-orm'
import { createLogger } from '@/lib/logs/console/logger'
import { db } from '@/db'
import { workflowBlocks, workflowEdges, workflowSubflows } from '@/db/schema'
import type { LoopConfig, WorkflowState } from '@/stores/workflows/workflow/types'
import { workflow, workflowBlocks, workflowEdges, workflowSubflows } from '@/db/schema'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
import { SUBFLOW_TYPES } from '@/stores/workflows/workflow/types'

const logger = createLogger('WorkflowDBHelpers')
Expand All @@ -12,7 +12,49 @@ export interface NormalizedWorkflowData {
edges: any[]
loops: Record<string, any>
parallels: Record<string, any>
isFromNormalizedTables: true // Flag to indicate this came from new tables
isFromNormalizedTables: boolean // Flag to indicate source (true = normalized tables, false = deployed state)
}

/**
* Load deployed workflow state for execution
* Returns deployed state if available, otherwise throws error
*/
export async function loadDeployedWorkflowState(
workflowId: string
): Promise<NormalizedWorkflowData> {
try {
// First check if workflow is deployed and get deployed state
const [workflowResult] = await db
.select({
isDeployed: workflow.isDeployed,
deployedState: workflow.deployedState,
})
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)

if (!workflowResult) {
throw new Error(`Workflow ${workflowId} not found`)
}

if (!workflowResult.isDeployed || !workflowResult.deployedState) {
throw new Error(`Workflow ${workflowId} is not deployed or has no deployed state`)
}

const deployedState = workflowResult.deployedState as any

// Convert deployed state to normalized format
return {
blocks: deployedState.blocks || {},
edges: deployedState.edges || [],
loops: deployedState.loops || {},
parallels: deployedState.parallels || {},
isFromNormalizedTables: false, // Flag to indicate this came from deployed state
}
} catch (error) {
logger.error(`Error loading deployed workflow state ${workflowId}:`, error)
throw error
}
}

/**
Expand Down Expand Up @@ -88,7 +130,6 @@ export async function loadWorkflowFromNormalizedTables(
const config = subflow.config || {}

if (subflow.type === SUBFLOW_TYPES.LOOP) {
const loopConfig = config as LoopConfig
loops[subflow.id] = {
id: subflow.id,
...config,
Expand Down Expand Up @@ -126,7 +167,7 @@ export async function saveWorkflowToNormalizedTables(
): Promise<{ success: boolean; jsonBlob?: any; error?: string }> {
try {
// Start a transaction
const result = await db.transaction(async (tx) => {
await db.transaction(async (tx) => {
// Clear existing data for this workflow
await Promise.all([
tx.delete(workflowBlocks).where(eq(workflowBlocks.workflowId, workflowId)),
Expand Down
14 changes: 4 additions & 10 deletions apps/sim/trigger/workflow-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { decryptSecret } from '@/lib/utils'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers'
import { loadDeployedWorkflowState } from '@/lib/workflows/db-helpers'
import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
import { db } from '@/db'
import { environment as environmentTable, userStats } from '@/db/schema'
Expand Down Expand Up @@ -60,16 +60,10 @@ export const workflowExecution = task({
)
}

// Load workflow data from normalized tables
const normalizedData = await loadWorkflowFromNormalizedTables(workflowId)
if (!normalizedData) {
logger.error(`[${requestId}] Workflow not found in normalized tables: ${workflowId}`)
throw new Error(`Workflow ${workflowId} data not found in normalized tables`)
}

logger.info(`[${requestId}] Workflow loaded successfully: ${workflowId}`)
// Load workflow data from deployed state (this task is only used for API executions right now)
const workflowData = await loadDeployedWorkflowState(workflowId)

const { blocks, edges, loops, parallels } = normalizedData
const { blocks, edges, loops, parallels } = workflowData

// Merge subblock states (server-safe version doesn't need workflowId)
const mergedStates = mergeSubblockState(blocks, {})
Expand Down