Skip to content

Commit

Permalink
fix: lazily expand tasks/steps
Browse files Browse the repository at this point in the history
Signed-off-by: Isitha Subasinghe <isitha@pipekit.io>
  • Loading branch information
isubasinghe committed Aug 18, 2023
1 parent a0fd619 commit 6178617
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 4 deletions.
13 changes: 11 additions & 2 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,9 @@ func (d *dagContext) findLeafTaskNames(tasks []wfv1.DAGTask) []string {
}

// expandTask expands a single DAG task containing withItems, withParams, withSequence into multiple parallel tasks
// We want to be lazy with expanding. Unfortunately this is not quite possible as the When field might rely on
// expansion to work with the shouldExecute function. To address this we apply a trick, we try to expand, if we fail, we then
// check shouldExecute, if shouldExecute returns false, we continue on as normal else error out
func expandTask(task wfv1.DAGTask) ([]wfv1.DAGTask, error) {
var err error
var items []wfv1.Item
Expand All @@ -766,12 +769,18 @@ func expandTask(task wfv1.DAGTask) ([]wfv1.DAGTask, error) {
} else if task.WithParam != "" {
err = json.Unmarshal([]byte(task.WithParam), &items)
if err != nil {
return nil, errors.Errorf(errors.CodeBadRequest, "withParam value could not be parsed as a JSON list: %s: %v", strings.TrimSpace(task.WithParam), err)
mustExec, mustExecErr := shouldExecute(task.When)
if mustExecErr != nil || mustExec {
return nil, errors.Errorf(errors.CodeBadRequest, "withParam value could not be parsed as a JSON list: %s: %v", strings.TrimSpace(task.WithParam), err)
}
}
} else if task.WithSequence != nil {
items, err = expandSequence(task.WithSequence)
if err != nil {
return nil, err
mustExec, mustExecErr := shouldExecute(task.When)
if mustExecErr != nil || mustExec {
return nil, err
}
}
} else {
return []wfv1.DAGTask{task}, nil
Expand Down
13 changes: 11 additions & 2 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,9 @@ func (woc *wfOperationCtx) expandStepGroup(sgNodeName string, stepGroup []wfv1.W
}

// expandStep expands a step containing withItems or withParams into multiple parallel steps
// We want to be lazy with expanding. Unfortunately this is not quite possible as the When field might rely on
// expansion to work with the shouldExecute function. To address this we apply a trick, we try to expand, if we fail, we then
// check shouldExecute, if shouldExecute returns false, we continue on as normal else error out
func (woc *wfOperationCtx) expandStep(step wfv1.WorkflowStep) ([]wfv1.WorkflowStep, error) {
var err error
expandedStep := make([]wfv1.WorkflowStep, 0)
Expand All @@ -507,12 +510,18 @@ func (woc *wfOperationCtx) expandStep(step wfv1.WorkflowStep) ([]wfv1.WorkflowSt
} else if step.WithParam != "" {
err = json.Unmarshal([]byte(step.WithParam), &items)
if err != nil {
return nil, errors.Errorf(errors.CodeBadRequest, "withParam value could not be parsed as a JSON list: %s: %v", strings.TrimSpace(step.WithParam), err)
mustExec, mustExecErr := shouldExecute(step.When)
if mustExecErr != nil || mustExec {
return nil, errors.Errorf(errors.CodeBadRequest, "withParam value could not be parsed as a JSON list: %s: %v", strings.TrimSpace(step.WithParam), err)
}
}
} else if step.WithSequence != nil {
items, err = expandSequence(step.WithSequence)
if err != nil {
return nil, err
mustExec, mustExecErr := shouldExecute(step.When)
if mustExecErr != nil || mustExec {
return nil, err
}
}
} else {
// this should have been prevented in expandStepGroup()
Expand Down

0 comments on commit 6178617

Please sign in to comment.