Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Immediately release locks by pending workflows that are shutting down. Fixes #10733 #10735

Merged
merged 12 commits into from
Mar 27, 2023
39 changes: 27 additions & 12 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,15 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
}
woc.updated = wfUpdate
if !acquired {
woc.log.Warn("Workflow processing has been postponed due to concurrency limit")
phase := woc.wf.Status.Phase
if phase == wfv1.WorkflowUnknown {
phase = wfv1.WorkflowPending
if !woc.releaseLocksForPendingTerminatingWfs(ctx) {
woc.log.Warn("Workflow processing has been postponed due to concurrency limit")
phase := woc.wf.Status.Phase
if phase == wfv1.WorkflowUnknown {
phase = wfv1.WorkflowPending
}
woc.markWorkflowPhase(ctx, phase, msg)
return
}
woc.markWorkflowPhase(ctx, phase, msg)
return
}
}

Expand Down Expand Up @@ -490,6 +492,17 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
}
}

func (woc *wfOperationCtx) releaseLocksForPendingTerminatingWfs(ctx context.Context) bool {
if sdStrategy := woc.GetShutdownStrategy(); sdStrategy == wfv1.ShutdownStrategyTerminate && woc.execWf.Status.Phase == wfv1.WorkflowPending {
if woc.controller.syncManager.ReleaseAll(woc.execWf) {
woc.log.WithFields(log.Fields{"key": woc.execWf.Name}).Info("Released all locks since this pending workflow is terminating")
woc.markWorkflowSuccess(ctx)
return true
}
}
return false
}
jessesuen marked this conversation as resolved.
Show resolved Hide resolved

// set Labels and Annotations for the Workflow
// Also, since we're setting Labels and Annotations we need to find any
// parameters formatted as "workflow.labels.<param>" or "workflow.annotations.<param>"
Expand Down Expand Up @@ -1868,13 +1881,15 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
if node == nil {
node = woc.initializeExecutableNode(nodeName, wfutil.GetNodeType(processedTmpl), templateScope, processedTmpl, orgTmpl, opts.boundaryID, wfv1.NodePending, msg)
}
lockName, err := argosync.GetLockName(processedTmpl.Synchronization, woc.wf.Namespace)
if err != nil {
// If an error were to be returned here, it would have been caught by TryAcquire. If it didn't, then it is
// unexpected behavior and is a bug.
panic("bug: GetLockName should not return an error after a call to TryAcquire")
if !woc.releaseLocksForPendingTerminatingWfs(ctx) {
lockName, err := argosync.GetLockName(processedTmpl.Synchronization, woc.wf.Namespace)
if err != nil {
// If an error were to be returned here, it would have been caught by TryAcquire. If it didn't, then it is
// unexpected behavior and is a bug.
panic("bug: GetLockName should not return an error after a call to TryAcquire")
}
return woc.markNodeWaitingForLock(node.Name, lockName.EncodeName()), nil
}
return woc.markNodeWaitingForLock(node.Name, lockName.EncodeName()), nil
} else {
woc.log.Infof("Node %s acquired synchronization lock", nodeName)
if node != nil {
Expand Down