Skip to content

Commit

Permalink
fix: do not process withParams when task/step Skipped
Browse files Browse the repository at this point in the history
Signed-off-by: Isitha Subasinghe <isitha@pipekit.io>
  • Loading branch information
isubasinghe committed Aug 13, 2023
1 parent fa09afc commit ec16322
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 22 deletions.
50 changes: 33 additions & 17 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,9 +534,17 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex
return
}

// check if we need to expand before actually expanding,
// we check the need by evaluating the newTask.When
mustExpand, err := shouldExecute(newTask.When)
if err != nil {
woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeError, err.Error())
connectDependencies(nodeName)
}

// Next, expand the DAG's withItems/withParams/withSequence (if any). If there was none, then
// expandedTasks will be a single element list of the same task
expandedTasks, err := expandTask(*newTask)
expandedTasks, err := expandTaskLazy(*newTask, mustExpand)
if err != nil {
woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeError, err.Error())
connectDependencies(nodeName)
Expand All @@ -549,7 +557,13 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex
if task.ShouldExpand() {
// DAG task with empty withParams list should be skipped
if len(expandedTasks) == 0 {
skipReason := "Skipped, empty params"
// either because empty params or because mustExpand is false
skipReason := ""
if !mustExpand {
skipReason = "Skipped, task not marked as needed to expand"
} else {
skipReason = "Skipped, empty params"
}
woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeSkipped, skipReason)
connectDependencies(nodeName)
} else if taskGroupNode == nil {
Expand Down Expand Up @@ -757,24 +771,26 @@ func (d *dagContext) findLeafTaskNames(tasks []wfv1.DAGTask) []string {
return leafTaskNames
}

// expandTask expands a single DAG task containing withItems, withParams, withSequence into multiple parallel tasks
func expandTask(task wfv1.DAGTask) ([]wfv1.DAGTask, error) {
// expandTaskLazy if the expand flag is given - expands a single DAG task containing withItems, withParams, withSequence into multiple parallel tasks
func expandTaskLazy(task wfv1.DAGTask, expand bool) ([]wfv1.DAGTask, error) {
var err error
var items []wfv1.Item
if len(task.WithItems) > 0 {
items = task.WithItems
} else if task.WithParam != "" {
err = json.Unmarshal([]byte(task.WithParam), &items)
if err != nil {
return nil, errors.Errorf(errors.CodeBadRequest, "withParam value could not be parsed as a JSON list: %s: %v", strings.TrimSpace(task.WithParam), err)
}
} else if task.WithSequence != nil {
items, err = expandSequence(task.WithSequence)
if err != nil {
return nil, err
if expand {
if len(task.WithItems) > 0 {
items = task.WithItems
} else if task.WithParam != "" {
err = json.Unmarshal([]byte(task.WithParam), &items)
if err != nil {
return nil, errors.Errorf(errors.CodeBadRequest, "withParam value could not be parsed as a JSON list: %s: %v", strings.TrimSpace(task.WithParam), err)
}
} else if task.WithSequence != nil {
items, err = expandSequence(task.WithSequence)
if err != nil {
return nil, err
}
} else {
return []wfv1.DAGTask{task}, nil
}
} else {
return []wfv1.DAGTask{task}, nil
}

taskBytes, err := json.Marshal(task)
Expand Down
23 changes: 18 additions & 5 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,14 @@ func (woc *wfOperationCtx) expandStepGroup(sgNodeName string, stepGroup []wfv1.W
newStepGroup = append(newStepGroup, step)
continue
}
expandedStep, err := woc.expandStep(step)

// do we need to actually expand?
mustExpand, err := shouldExecute(step.When)
if err != nil {
return nil, err
}

expandedStep, err := woc.expandStepLazy(step, mustExpand)
if err != nil {
return nil, err
}
Expand All @@ -486,7 +493,12 @@ func (woc *wfOperationCtx) expandStepGroup(sgNodeName string, stepGroup []wfv1.W
childNodeName := fmt.Sprintf("%s.%s", sgNodeName, step.Name)
if _, err := woc.wf.GetNodeByName(childNodeName); err != nil {
stepTemplateScope := stepsCtx.tmplCtx.GetTemplateScope()
skipReason := "Skipped, empty params"
skipReason := ""
if !mustExpand {
skipReason = "Skipped, step not marked as needed to expand"
} else {
skipReason = "Skipped, empty params"
}
woc.log.Infof("Skipping %s: %s", childNodeName, skipReason)
woc.initializeNode(childNodeName, wfv1.NodeTypeSkipped, stepTemplateScope, &step, stepsCtx.boundaryID, wfv1.NodeSkipped, skipReason)
woc.addChildNode(sgNodeName, childNodeName)
Expand All @@ -497,8 +509,9 @@ func (woc *wfOperationCtx) expandStepGroup(sgNodeName string, stepGroup []wfv1.W
return newStepGroup, nil
}

// expandStep expands a step containing withItems or withParams into multiple parallel steps
func (woc *wfOperationCtx) expandStep(step wfv1.WorkflowStep) ([]wfv1.WorkflowStep, error) {
// expandStepLazy expands a step containing withItems or withParams into multiple parallel steps only if needed, this need
// is determined by the expand parameter.
func (woc *wfOperationCtx) expandStepLazy(step wfv1.WorkflowStep, expand bool) ([]wfv1.WorkflowStep, error) {
var err error
expandedStep := make([]wfv1.WorkflowStep, 0)
var items []wfv1.Item
Expand Down Expand Up @@ -557,7 +570,7 @@ func (woc *wfOperationCtx) prepareDefaultMetricScope() (map[string]string, map[s
localScope[durationCPU] = "0"
localScope[durationMem] = "0"

var realTimeScope = map[string]func() float64{
realTimeScope := map[string]func() float64{
common.GlobalVarWorkflowDuration: func() float64 {
if woc.wf.Status.Phase.Completed() {
return woc.wf.Status.FinishedAt.Time.Sub(woc.wf.Status.StartedAt.Time).Seconds()
Expand Down

0 comments on commit ec16322

Please sign in to comment.