Skip to content

Commit 01e0723

Browse files
Sg312icecrasher321
andauthored
fix(loops): fix loops on empty collection (#3049)
* Fix * Cleanup * order of ops for validations * only reachable subflow nodes should hit validation --------- Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
1 parent 6814f33 commit 01e0723

File tree

7 files changed

+75
-20
lines changed

7 files changed

+75
-20
lines changed

apps/sim/executor/dag/construction/edges.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ export class EdgeConstructor {
207207
for (const connection of workflow.connections) {
208208
let { source, target } = connection
209209
const originalSource = source
210+
const originalTarget = target
210211
let sourceHandle = this.generateSourceHandle(
211212
source,
212213
target,
@@ -257,14 +258,14 @@ export class EdgeConstructor {
257258
target = sentinelStartId
258259
}
259260

260-
if (loopSentinelStartId) {
261-
this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle)
262-
}
263-
264261
if (this.edgeCrossesLoopBoundary(source, target, blocksInLoops, dag)) {
265262
continue
266263
}
267264

265+
if (loopSentinelStartId && !blocksInLoops.has(originalTarget)) {
266+
this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle)
267+
}
268+
268269
if (!this.isEdgeReachable(source, target, reachableBlocks, dag)) {
269270
continue
270271
}

apps/sim/executor/execution/edge-manager.ts

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,15 +77,16 @@ export class EdgeManager {
7777
}
7878
}
7979

80-
// Check if any deactivation targets that previously received an activated edge are now ready
81-
for (const { target } of edgesToDeactivate) {
82-
if (
83-
!readyNodes.includes(target) &&
84-
!activatedTargets.includes(target) &&
85-
this.nodesWithActivatedEdge.has(target) &&
86-
this.isTargetReady(target)
87-
) {
88-
readyNodes.push(target)
80+
if (output.selectedRoute !== EDGE.LOOP_EXIT && output.selectedRoute !== EDGE.PARALLEL_EXIT) {
81+
for (const { target } of edgesToDeactivate) {
82+
if (
83+
!readyNodes.includes(target) &&
84+
!activatedTargets.includes(target) &&
85+
this.nodesWithActivatedEdge.has(target) &&
86+
this.isTargetReady(target)
87+
) {
88+
readyNodes.push(target)
89+
}
8990
}
9091
}
9192

apps/sim/executor/execution/engine.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,12 @@ export class ExecutionEngine {
390390
logger.info('Processing outgoing edges', {
391391
nodeId,
392392
outgoingEdgesCount: node.outgoingEdges.size,
393+
outgoingEdges: Array.from(node.outgoingEdges.entries()).map(([id, e]) => ({
394+
id,
395+
target: e.target,
396+
sourceHandle: e.sourceHandle,
397+
})),
398+
output,
393399
readyNodesCount: readyNodes.length,
394400
readyNodes,
395401
})

apps/sim/executor/execution/state.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ export interface ParallelScope {
2727
items?: any[]
2828
/** Error message if parallel validation failed (e.g., exceeded max branches) */
2929
validationError?: string
30+
/** Whether the parallel has an empty distribution and should be skipped */
31+
isEmpty?: boolean
3032
}
3133

3234
export class ExecutionState implements BlockStateController {

apps/sim/executor/orchestrators/loop.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -386,10 +386,10 @@ export class LoopOrchestrator {
386386
return true
387387
}
388388

389-
// forEach: skip if items array is empty
390389
if (scope.loopType === 'forEach') {
391390
if (!scope.items || scope.items.length === 0) {
392-
logger.info('ForEach loop has empty items, skipping loop body', { loopId })
391+
logger.info('ForEach loop has empty collection, skipping loop body', { loopId })
392+
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
393393
return false
394394
}
395395
return true
@@ -399,6 +399,8 @@ export class LoopOrchestrator {
399399
if (scope.loopType === 'for') {
400400
if (scope.maxIterations === 0) {
401401
logger.info('For loop has 0 iterations, skipping loop body', { loopId })
402+
// Set empty output for the loop
403+
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
402404
return false
403405
}
404406
return true

apps/sim/executor/orchestrators/node.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ export class NodeExecutionOrchestrator {
9797
if (loopId) {
9898
const shouldExecute = await this.loopOrchestrator.evaluateInitialCondition(ctx, loopId)
9999
if (!shouldExecute) {
100-
logger.info('While loop initial condition false, skipping loop body', { loopId })
100+
logger.info('Loop initial condition false, skipping loop body', { loopId })
101101
return {
102102
sentinelStart: true,
103103
shouldExit: true,
@@ -158,6 +158,17 @@ export class NodeExecutionOrchestrator {
158158
this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
159159
}
160160
}
161+
162+
const scope = this.parallelOrchestrator.getParallelScope(ctx, parallelId)
163+
if (scope?.isEmpty) {
164+
logger.info('Parallel has empty distribution, skipping parallel body', { parallelId })
165+
return {
166+
sentinelStart: true,
167+
shouldExit: true,
168+
selectedRoute: EDGE.PARALLEL_EXIT,
169+
}
170+
}
171+
161172
return { sentinelStart: true }
162173
}
163174

apps/sim/executor/orchestrators/parallel.ts

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,13 @@ export class ParallelOrchestrator {
6161

6262
let items: any[] | undefined
6363
let branchCount: number
64+
let isEmpty = false
6465

6566
try {
66-
const resolved = this.resolveBranchCount(ctx, parallelConfig)
67+
const resolved = this.resolveBranchCount(ctx, parallelConfig, parallelId)
6768
branchCount = resolved.branchCount
6869
items = resolved.items
70+
isEmpty = resolved.isEmpty ?? false
6971
} catch (error) {
7072
const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}`
7173
logger.error(errorMessage, { parallelId, distribution: parallelConfig.distribution })
@@ -91,6 +93,34 @@ export class ParallelOrchestrator {
9193
throw new Error(branchError)
9294
}
9395

96+
// Handle empty distribution - skip parallel body
97+
if (isEmpty || branchCount === 0) {
98+
const scope: ParallelScope = {
99+
parallelId,
100+
totalBranches: 0,
101+
branchOutputs: new Map(),
102+
completedCount: 0,
103+
totalExpectedNodes: 0,
104+
items: [],
105+
isEmpty: true,
106+
}
107+
108+
if (!ctx.parallelExecutions) {
109+
ctx.parallelExecutions = new Map()
110+
}
111+
ctx.parallelExecutions.set(parallelId, scope)
112+
113+
// Set empty output for the parallel
114+
this.state.setBlockOutput(parallelId, { results: [] })
115+
116+
logger.info('Parallel scope initialized with empty distribution, skipping body', {
117+
parallelId,
118+
branchCount: 0,
119+
})
120+
121+
return scope
122+
}
123+
94124
const { entryNodes } = this.expander.expandParallel(this.dag, parallelId, branchCount, items)
95125

96126
const scope: ParallelScope = {
@@ -127,15 +157,17 @@ export class ParallelOrchestrator {
127157

128158
private resolveBranchCount(
129159
ctx: ExecutionContext,
130-
config: SerializedParallel
131-
): { branchCount: number; items?: any[] } {
160+
config: SerializedParallel,
161+
parallelId: string
162+
): { branchCount: number; items?: any[]; isEmpty?: boolean } {
132163
if (config.parallelType === 'count') {
133164
return { branchCount: config.count ?? 1 }
134165
}
135166

136167
const items = this.resolveDistributionItems(ctx, config)
137168
if (items.length === 0) {
138-
return { branchCount: config.count ?? 1 }
169+
logger.info('Parallel has empty distribution, skipping parallel body', { parallelId })
170+
return { branchCount: 0, items: [], isEmpty: true }
139171
}
140172

141173
return { branchCount: items.length, items }

0 commit comments

Comments
 (0)