Skip to content

Commit f4f293a

Browse files
committed
feat(terminal): real-time child workflow block propagation in console
1 parent b6fb7f5 commit f4f293a

File tree

11 files changed

+218
-3
lines changed

11 files changed

+218
-3
lines changed

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -919,12 +919,34 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
919919
selectedOutputs
920920
)
921921

922+
const onChildWorkflowInstanceReady = (
923+
blockId: string,
924+
childWorkflowInstanceId: string,
925+
iterationContext?: IterationContext
926+
) => {
927+
sendEvent({
928+
type: 'block:childWorkflowStarted',
929+
timestamp: new Date().toISOString(),
930+
executionId,
931+
workflowId,
932+
data: {
933+
blockId,
934+
childWorkflowInstanceId,
935+
...(iterationContext && {
936+
iterationCurrent: iterationContext.iterationCurrent,
937+
iterationContainerId: iterationContext.iterationContainerId,
938+
}),
939+
},
940+
})
941+
}
942+
922943
const result = await executeWorkflowCore({
923944
snapshot,
924945
callbacks: {
925946
onBlockStart,
926947
onBlockComplete,
927948
onStream,
949+
onChildWorkflowInstanceReady,
928950
},
929951
loggingSession,
930952
abortSignal: timeoutController.signal,

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,27 @@ export function useWorkflowExecution() {
549549
}
550550
}
551551

552-
return { onBlockStarted, onBlockCompleted, onBlockError }
552+
const onBlockChildWorkflowStarted = (data: {
553+
blockId: string
554+
childWorkflowInstanceId: string
555+
iterationCurrent?: number
556+
iterationContainerId?: string
557+
}) => {
558+
if (isStaleExecution()) return
559+
updateConsole(
560+
data.blockId,
561+
{
562+
childWorkflowInstanceId: data.childWorkflowInstanceId,
563+
...(data.iterationCurrent !== undefined && { iterationCurrent: data.iterationCurrent }),
564+
...(data.iterationContainerId !== undefined && {
565+
iterationContainerId: data.iterationContainerId,
566+
}),
567+
},
568+
executionIdRef.current
569+
)
570+
}
571+
572+
return { onBlockStarted, onBlockCompleted, onBlockError, onBlockChildWorkflowStarted }
553573
},
554574
[addConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus, updateConsole]
555575
)
@@ -1349,6 +1369,7 @@ export function useWorkflowExecution() {
13491369
onBlockStarted: blockHandlers.onBlockStarted,
13501370
onBlockCompleted: blockHandlers.onBlockCompleted,
13511371
onBlockError: blockHandlers.onBlockError,
1372+
onBlockChildWorkflowStarted: blockHandlers.onBlockChildWorkflowStarted,
13521373

13531374
onStreamChunk: (data) => {
13541375
const existing = streamedContent.get(data.blockId) || ''
@@ -1946,6 +1967,7 @@ export function useWorkflowExecution() {
19461967
onBlockStarted: blockHandlers.onBlockStarted,
19471968
onBlockCompleted: blockHandlers.onBlockCompleted,
19481969
onBlockError: blockHandlers.onBlockError,
1970+
onBlockChildWorkflowStarted: blockHandlers.onBlockChildWorkflowStarted,
19491971

19501972
onExecutionCompleted: (data) => {
19511973
if (data.success) {
@@ -2174,6 +2196,10 @@ export function useWorkflowExecution() {
21742196
clearOnce()
21752197
handlers.onBlockError(data)
21762198
},
2199+
onBlockChildWorkflowStarted: (data) => {
2200+
clearOnce()
2201+
handlers.onBlockChildWorkflowStarted(data)
2202+
},
21772203
onExecutionCompleted: () => {
21782204
const currentId = useExecutionStore
21792205
.getState()

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,24 @@ export async function executeWorkflowWithFullLogging(
220220
break
221221
}
222222

223+
case 'block:childWorkflowStarted': {
224+
const { updateConsole } = useTerminalConsoleStore.getState()
225+
updateConsole(
226+
event.data.blockId,
227+
{
228+
childWorkflowInstanceId: event.data.childWorkflowInstanceId,
229+
...(event.data.iterationCurrent !== undefined && {
230+
iterationCurrent: event.data.iterationCurrent,
231+
}),
232+
...(event.data.iterationContainerId !== undefined && {
233+
iterationContainerId: event.data.iterationContainerId,
234+
}),
235+
},
236+
executionId
237+
)
238+
break
239+
}
240+
223241
case 'execution:completed':
224242
executionResult = {
225243
success: event.data.success,

apps/sim/executor/execution/block-executor.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,8 @@ export class BlockExecutor {
208208
parallelId?: string
209209
branchIndex?: number
210210
branchTotal?: number
211+
originalBlockId?: string
212+
isLoopNode?: boolean
211213
} {
212214
const metadata = node?.metadata ?? {}
213215
return {
@@ -216,6 +218,8 @@ export class BlockExecutor {
216218
parallelId: metadata.parallelId,
217219
branchIndex: metadata.branchIndex,
218220
branchTotal: metadata.branchTotal,
221+
originalBlockId: metadata.originalBlockId,
222+
isLoopNode: metadata.isLoopNode,
219223
}
220224
}
221225

apps/sim/executor/execution/executor.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ export class DAGExecutor {
322322
onStream: this.contextExtensions.onStream,
323323
onBlockStart: this.contextExtensions.onBlockStart,
324324
onBlockComplete: this.contextExtensions.onBlockComplete,
325+
onChildWorkflowInstanceReady: this.contextExtensions.onChildWorkflowInstanceReady,
325326
abortSignal: this.contextExtensions.abortSignal,
326327
childWorkflowContext: this.contextExtensions.childWorkflowContext,
327328
includeFileBase64: this.contextExtensions.includeFileBase64,

apps/sim/executor/execution/types.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,12 @@ export interface ExecutionCallbacks {
8383
iterationContext?: IterationContext,
8484
childWorkflowContext?: ChildWorkflowContext
8585
) => Promise<void>
86+
/** Fires immediately after instanceId is generated, before child execution begins. */
87+
onChildWorkflowInstanceReady?: (
88+
blockId: string,
89+
childWorkflowInstanceId: string,
90+
iterationContext?: IterationContext
91+
) => void
8692
}
8793

8894
export interface ContextExtensions {
@@ -142,6 +148,13 @@ export interface ContextExtensions {
142148
/** Context identifying this execution as a child of a workflow block */
143149
childWorkflowContext?: ChildWorkflowContext
144150

151+
/** Fires immediately after instanceId is generated, before child execution begins. */
152+
onChildWorkflowInstanceReady?: (
153+
blockId: string,
154+
childWorkflowInstanceId: string,
155+
iterationContext?: IterationContext
156+
) => void
157+
145158
/**
146159
* Run-from-block configuration. When provided, executor runs in partial
147160
* execution mode starting from the specified block.

apps/sim/executor/handlers/workflow/workflow-handler.ts

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import type { BlockOutput } from '@/blocks/types'
66
import { Executor } from '@/executor'
77
import { BlockType, DEFAULTS, HTTP } from '@/executor/constants'
88
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
9+
import type { IterationContext } from '@/executor/execution/types'
910
import type {
1011
BlockHandler,
1112
ExecutionContext,
@@ -44,6 +45,40 @@ export class WorkflowBlockHandler implements BlockHandler {
4445
ctx: ExecutionContext,
4546
block: SerializedBlock,
4647
inputs: Record<string, any>
48+
): Promise<BlockOutput | StreamingExecution> {
49+
return this._executeCore(ctx, block, inputs)
50+
}
51+
52+
async executeWithNode(
53+
ctx: ExecutionContext,
54+
block: SerializedBlock,
55+
inputs: Record<string, any>,
56+
nodeMetadata: {
57+
nodeId: string
58+
loopId?: string
59+
parallelId?: string
60+
branchIndex?: number
61+
branchTotal?: number
62+
originalBlockId?: string
63+
isLoopNode?: boolean
64+
}
65+
): Promise<BlockOutput | StreamingExecution> {
66+
return this._executeCore(ctx, block, inputs, nodeMetadata)
67+
}
68+
69+
private async _executeCore(
70+
ctx: ExecutionContext,
71+
block: SerializedBlock,
72+
inputs: Record<string, any>,
73+
nodeMetadata?: {
74+
nodeId: string
75+
loopId?: string
76+
parallelId?: string
77+
branchIndex?: number
78+
branchTotal?: number
79+
originalBlockId?: string
80+
isLoopNode?: boolean
81+
}
4782
): Promise<BlockOutput | StreamingExecution> {
4883
logger.info(`Executing workflow block: ${block.id}`)
4984

@@ -122,6 +157,12 @@ export class WorkflowBlockHandler implements BlockHandler {
122157
const childDepth = (ctx.childWorkflowContext?.depth ?? 0) + 1
123158
const shouldPropagateCallbacks = childDepth <= DEFAULTS.MAX_SSE_CHILD_DEPTH
124159

160+
if (nodeMetadata && shouldPropagateCallbacks) {
161+
const effectiveBlockId = nodeMetadata.originalBlockId ?? nodeMetadata.nodeId
162+
const iterationContext = this.getIterationContext(ctx, nodeMetadata)
163+
ctx.onChildWorkflowInstanceReady?.(effectiveBlockId, instanceId, iterationContext)
164+
}
165+
125166
const subExecutor = new Executor({
126167
workflow: childWorkflow.serializedState,
127168
workflowInput: childWorkflowInput,
@@ -138,6 +179,7 @@ export class WorkflowBlockHandler implements BlockHandler {
138179
onBlockStart: ctx.onBlockStart,
139180
onBlockComplete: ctx.onBlockComplete,
140181
onStream: ctx.onStream as ((streamingExecution: unknown) => Promise<void>) | undefined,
182+
onChildWorkflowInstanceReady: ctx.onChildWorkflowInstanceReady,
141183
childWorkflowContext: {
142184
parentBlockId: instanceId,
143185
workflowName: childWorkflowName,
@@ -208,6 +250,40 @@ export class WorkflowBlockHandler implements BlockHandler {
208250
}
209251
}
210252

253+
private getIterationContext(
254+
ctx: ExecutionContext,
255+
nodeMetadata: {
256+
loopId?: string
257+
parallelId?: string
258+
branchIndex?: number
259+
branchTotal?: number
260+
isLoopNode?: boolean
261+
}
262+
): IterationContext | undefined {
263+
if (nodeMetadata.branchIndex !== undefined && nodeMetadata.parallelId !== undefined) {
264+
return {
265+
iterationCurrent: nodeMetadata.branchIndex,
266+
iterationTotal: nodeMetadata.branchTotal,
267+
iterationType: 'parallel',
268+
iterationContainerId: nodeMetadata.parallelId,
269+
}
270+
}
271+
272+
if (nodeMetadata.isLoopNode && nodeMetadata.loopId) {
273+
const loopScope = ctx.loopExecutions?.get(nodeMetadata.loopId)
274+
if (loopScope && loopScope.iteration !== undefined) {
275+
return {
276+
iterationCurrent: loopScope.iteration,
277+
iterationTotal: loopScope.maxIterations,
278+
iterationType: 'loop',
279+
iterationContainerId: nodeMetadata.loopId,
280+
}
281+
}
282+
}
283+
284+
return undefined
285+
}
286+
211287
/**
212288
* Builds a cleaner error message for nested workflow errors.
213289
* Parses nested error messages to extract workflow chain and root error.

apps/sim/executor/types.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,13 @@ export interface ExecutionContext {
259259
/** Context identifying this execution as a child of a workflow block */
260260
childWorkflowContext?: ChildWorkflowContext
261261

262+
/** Fires immediately after instanceId is generated, before child execution begins. */
263+
onChildWorkflowInstanceReady?: (
264+
blockId: string,
265+
childWorkflowInstanceId: string,
266+
iterationContext?: IterationContext
267+
) => void
268+
262269
/**
263270
* AbortSignal for cancellation support.
264271
* When the signal is aborted, execution should stop gracefully.
@@ -361,6 +368,8 @@ export interface BlockHandler {
361368
parallelId?: string
362369
branchIndex?: number
363370
branchTotal?: number
371+
originalBlockId?: string
372+
isLoopNode?: boolean
364373
}
365374
) => Promise<BlockOutput | StreamingExecution>
366375
}

apps/sim/hooks/use-execution-stream.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { useCallback } from 'react'
22
import { createLogger } from '@sim/logger'
33
import type {
4+
BlockChildWorkflowStartedData,
45
BlockCompletedData,
56
BlockErrorData,
67
BlockStartedData,
@@ -83,6 +84,9 @@ async function processSSEStream(
8384
case 'block:error':
8485
callbacks.onBlockError?.(event.data)
8586
break
87+
case 'block:childWorkflowStarted':
88+
callbacks.onBlockChildWorkflowStarted?.(event.data)
89+
break
8690
case 'stream:chunk':
8791
callbacks.onStreamChunk?.(event.data)
8892
break
@@ -110,6 +114,7 @@ export interface ExecutionStreamCallbacks {
110114
onBlockStarted?: (data: BlockStartedData) => void
111115
onBlockCompleted?: (data: BlockCompletedData) => void
112116
onBlockError?: (data: BlockErrorData) => void
117+
onBlockChildWorkflowStarted?: (data: BlockChildWorkflowStartedData) => void
113118
onStreamChunk?: (data: StreamChunkData) => void
114119
onStreamDone?: (data: StreamDoneData) => void
115120
}

apps/sim/lib/workflows/executor/execution-core.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ export async function executeWorkflowCore(
129129
const { metadata, workflow, input, workflowVariables, selectedOutputs } = snapshot
130130
const { requestId, workflowId, userId, triggerType, executionId, triggerBlockId, useDraftState } =
131131
metadata
132-
const { onBlockStart, onBlockComplete, onStream } = callbacks
132+
const { onBlockStart, onBlockComplete, onStream, onChildWorkflowInstanceReady } = callbacks
133133

134134
const providedWorkspaceId = metadata.workspaceId
135135
if (!providedWorkspaceId) {
@@ -329,6 +329,7 @@ export async function executeWorkflowCore(
329329
includeFileBase64,
330330
base64MaxBytes,
331331
stopAfterBlockId: resolvedStopAfterBlockId,
332+
onChildWorkflowInstanceReady,
332333
}
333334

334335
const executorInstance = new Executor({

0 commit comments

Comments
 (0)