Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 21 additions & 15 deletions apps/sim/app/api/workflows/[id]/log/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,42 +23,48 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
const body = await request.json()
const { logs, executionId, result } = body

// If result is provided, use logging system for full tool call extraction
if (result) {
logger.info(`[${requestId}] Persisting execution result for workflow: ${id}`, {
executionId,
success: result.success,
})

// Check if this execution is from chat using only the explicit source flag
const isChatExecution = result.metadata?.source === 'chat'

// Also log to logging system
const triggerType = isChatExecution ? 'chat' : 'manual'
const loggingSession = new LoggingSession(id, executionId, triggerType, requestId)

const userId = validation.workflow.userId
const workspaceId = validation.workflow.workspaceId || ''

await loggingSession.safeStart({
userId: '', // TODO: Get from session
workspaceId: '', // TODO: Get from workflow
userId,
workspaceId,
variables: {},
})

// Build trace spans from execution logs
const { traceSpans } = buildTraceSpans(result)

await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: result.metadata?.duration || 0,
finalOutput: result.output || {},
traceSpans,
})
if (result.success === false) {
const message = result.error || 'Workflow execution failed'
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: result.metadata?.duration || 0,
error: { message },
})
} else {
const { traceSpans } = buildTraceSpans(result)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: result.metadata?.duration || 0,
finalOutput: result.output || {},
traceSpans,
})
}

return createSuccessResponse({
message: 'Execution logs persisted successfully',
})
}

// Fall back to the original log format if 'result' isn't provided
if (!logs || !Array.isArray(logs) || logs.length === 0) {
logger.warn(`[${requestId}] No logs provided for workflow: ${id}`)
return createErrorResponse('No logs provided', 400)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { getBlock } from '@/blocks'
import type { BlockOutput } from '@/blocks/types'
import { Executor } from '@/executor'
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
import { Serializer } from '@/serializer'
import { Serializer, WorkflowValidationError } from '@/serializer'
import type { SerializedWorkflow } from '@/serializer/types'
import { useExecutionStore } from '@/stores/execution/store'
import { useConsoleStore } from '@/stores/panel/console/store'
Expand All @@ -16,6 +16,7 @@ import { useEnvironmentStore } from '@/stores/settings/environment/store'
import { useGeneralStore } from '@/stores/settings/general/store'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { mergeSubblockState } from '@/stores/workflows/utils'
import { generateLoopBlocks, generateParallelBlocks } from '@/stores/workflows/workflow/utils'
import { useCurrentWorkflow } from './use-current-workflow'

const logger = createLogger('useWorkflowExecution')
Expand Down Expand Up @@ -488,7 +489,7 @@ export function useWorkflowExecution() {
}
return result
} catch (error: any) {
const errorResult = handleExecutionError(error)
const errorResult = handleExecutionError(error, { executionId })
persistLogs(executionId, errorResult).catch((err) => {
logger.error('Error persisting logs:', { error: err })
})
Expand Down Expand Up @@ -518,12 +519,7 @@ export function useWorkflowExecution() {
executionId?: string
): Promise<ExecutionResult | StreamingExecution> => {
// Use currentWorkflow but check if we're in diff mode
const {
blocks: workflowBlocks,
edges: workflowEdges,
loops: workflowLoops,
parallels: workflowParallels,
} = currentWorkflow
const { blocks: workflowBlocks, edges: workflowEdges } = currentWorkflow

// Filter out blocks without type (these are layout-only blocks)
const validBlocks = Object.entries(workflowBlocks).reduce(
Expand Down Expand Up @@ -646,12 +642,17 @@ export function useWorkflowExecution() {
(edge) => !triggerBlockIds.includes(edge.source) && !triggerBlockIds.includes(edge.target)
)

// Create serialized workflow with filtered blocks and edges
// Derive subflows from the current filtered graph to avoid stale state
const runtimeLoops = generateLoopBlocks(filteredStates)
const runtimeParallels = generateParallelBlocks(filteredStates)

// Create serialized workflow with validation enabled
const workflow = new Serializer().serializeWorkflow(
filteredStates,
filteredEdges,
workflowLoops,
workflowParallels
runtimeLoops,
runtimeParallels,
true
)

// If this is a chat execution, get the selected outputs
Expand Down Expand Up @@ -690,7 +691,7 @@ export function useWorkflowExecution() {
return newExecutor.execute(activeWorkflowId || '')
}

const handleExecutionError = (error: any) => {
const handleExecutionError = (error: any, options?: { executionId?: string }) => {
let errorMessage = 'Unknown error'
if (error instanceof Error) {
errorMessage = error.message || `Error: ${String(error)}`
Expand Down Expand Up @@ -723,6 +724,36 @@ export function useWorkflowExecution() {
errorMessage = 'API request failed - no specific error details available'
}

// If we failed before creating an executor (e.g., serializer validation), add a console entry
if (!executor) {
try {
// Prefer attributing to specific subflow if we have a structured error
let blockId = 'serialization'
let blockName = 'Serialization'
let blockType = 'serializer'
if (error instanceof WorkflowValidationError) {
blockId = error.blockId || blockId
blockName = error.blockName || blockName
blockType = error.blockType || blockType
}

useConsoleStore.getState().addConsole({
input: {},
output: {},
success: false,
error: errorMessage,
durationMs: 0,
startedAt: new Date().toISOString(),
endedAt: new Date().toISOString(),
workflowId: activeWorkflowId || '',
blockId,
executionId: options?.executionId,
blockName,
blockType,
})
} catch {}
}

const errorResult: ExecutionResult = {
success: false,
output: {},
Expand Down
46 changes: 44 additions & 2 deletions apps/sim/lib/logs/execution/logging-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,51 @@ export class LoggingSession {
return true
} catch (error) {
if (this.requestId) {
logger.error(`[${this.requestId}] Logging start failed:`, error)
logger.warn(
`[${this.requestId}] Logging start failed - falling back to minimal session:`,
error
)
}

// Fallback: create a minimal logging session without full workflow state
try {
const { userId, workspaceId, variables, triggerData } = params
this.trigger = createTriggerObject(this.triggerType, triggerData)
this.environment = createEnvironmentObject(
this.workflowId,
this.executionId,
userId,
workspaceId,
variables
)
// Minimal workflow state when normalized data is unavailable
this.workflowState = {
blocks: {},
edges: [],
loops: {},
parallels: {},
} as unknown as WorkflowState

await executionLogger.startWorkflowExecution({
workflowId: this.workflowId,
executionId: this.executionId,
trigger: this.trigger,
environment: this.environment,
workflowState: this.workflowState,
})

if (this.requestId) {
logger.debug(
`[${this.requestId}] Started minimal logging for execution ${this.executionId}`
)
}
return true
} catch (fallbackError) {
if (this.requestId) {
logger.error(`[${this.requestId}] Minimal logging start also failed:`, fallbackError)
}
return false
}
return false
}
}

Expand Down
113 changes: 113 additions & 0 deletions apps/sim/serializer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,21 @@ import { getTool } from '@/tools/utils'

const logger = createLogger('Serializer')

/**
* Structured validation error for pre-execution workflow validation
*/
export class WorkflowValidationError extends Error {
constructor(
message: string,
public blockId?: string,
public blockType?: string,
public blockName?: string
) {
super(message)
this.name = 'WorkflowValidationError'
}
}

/**
* Helper function to check if a subblock should be included in serialization based on current mode
*/
Expand All @@ -29,6 +44,11 @@ export class Serializer {
parallels?: Record<string, Parallel>,
validateRequired = false
): SerializedWorkflow {
// Validate subflow requirements (loops/parallels) before serialization if requested
if (validateRequired) {
this.validateSubflowsBeforeExecution(blocks, loops || {}, parallels || {})
}

return {
version: '1.0',
blocks: Object.values(blocks).map((block) => this.serializeBlock(block, validateRequired)),
Expand All @@ -43,6 +63,99 @@ export class Serializer {
}
}

/**
* Validate loop and parallel subflows for required inputs when running in "each/collection" modes
*/
private validateSubflowsBeforeExecution(
blocks: Record<string, BlockState>,
loops: Record<string, Loop>,
parallels: Record<string, Parallel>
): void {
// Validate loops in forEach mode
Object.values(loops || {}).forEach((loop) => {
if (!loop) return
if (loop.loopType === 'forEach') {
const items = (loop as any).forEachItems

const hasNonEmptyCollection = (() => {
if (items === undefined || items === null) return false
if (Array.isArray(items)) return items.length > 0
if (typeof items === 'object') return Object.keys(items).length > 0
if (typeof items === 'string') {
const trimmed = items.trim()
if (trimmed.length === 0) return false
// If it looks like JSON, parse to confirm non-empty [] / {}
if (trimmed.startsWith('[') || trimmed.startsWith('{')) {
try {
const parsed = JSON.parse(trimmed)
if (Array.isArray(parsed)) return parsed.length > 0
if (parsed && typeof parsed === 'object') return Object.keys(parsed).length > 0
} catch {
// Non-JSON or invalid JSON string – allow non-empty string (could be a reference like <start.items>)
return true
}
}
// Non-JSON string – allow (may be a variable reference/expression)
return true
}
return false
})()

if (!hasNonEmptyCollection) {
const blockName = blocks[loop.id]?.name || 'Loop'
const error = new WorkflowValidationError(
`${blockName} requires a collection for forEach mode. Provide a non-empty array/object or a variable reference.`,
loop.id,
'loop',
blockName
)
throw error
}
}
})

// Validate parallels in collection mode
Object.values(parallels || {}).forEach((parallel) => {
if (!parallel) return
if ((parallel as any).parallelType === 'collection') {
const distribution = (parallel as any).distribution

const hasNonEmptyDistribution = (() => {
if (distribution === undefined || distribution === null) return false
if (Array.isArray(distribution)) return distribution.length > 0
if (typeof distribution === 'object') return Object.keys(distribution).length > 0
if (typeof distribution === 'string') {
const trimmed = distribution.trim()
if (trimmed.length === 0) return false
// If it looks like JSON, parse to confirm non-empty [] / {}
if (trimmed.startsWith('[') || trimmed.startsWith('{')) {
try {
const parsed = JSON.parse(trimmed)
if (Array.isArray(parsed)) return parsed.length > 0
if (parsed && typeof parsed === 'object') return Object.keys(parsed).length > 0
} catch {
return true
}
}
return true
}
return false
})()

if (!hasNonEmptyDistribution) {
const blockName = blocks[parallel.id]?.name || 'Parallel'
const error = new WorkflowValidationError(
`${blockName} requires a collection for collection mode. Provide a non-empty array/object or a variable reference.`,
parallel.id,
'parallel',
blockName
)
throw error
}
}
})
}

private serializeBlock(block: BlockState, validateRequired = false): SerializedBlock {
// Special handling for subflow blocks (loops, parallels, etc.)
if (block.type === 'loop' || block.type === 'parallel') {
Expand Down