Skip to content

Commit

Permalink
fix: make the process of patching pods exclusive (#12596)
Browse files Browse the repository at this point in the history
Signed-off-by: Atsushi Sakai <sakai.at24@gmail.com>
  • Loading branch information
sakai-ast authored Feb 5, 2024
1 parent 13444e6 commit e771bde
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
11 changes: 11 additions & 0 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ type WorkflowController struct {
executorPlugins map[string]map[string]*spec.Plugin // namespace -> name -> plugin

recentCompletions recentCompletions
podNameLocks *gosync.Map
}

type PatchOperation struct {
Expand Down Expand Up @@ -209,6 +210,7 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli
eventRecorderManager: events.NewEventRecorderManager(kubeclientset),
progressPatchTickDuration: env.LookupEnvDurationOr(common.EnvVarProgressPatchTickDuration, 1*time.Minute),
progressFileTickDuration: env.LookupEnvDurationOr(common.EnvVarProgressFileTickDuration, 3*time.Second),
podNameLocks: &gosync.Map{},
}

if executorPlugins {
Expand Down Expand Up @@ -605,6 +607,15 @@ func (wfc *WorkflowController) getPodFromCache(namespace string, podName string)
}

func (wfc *WorkflowController) enablePodForDeletion(ctx context.Context, pods typedv1.PodInterface, namespace string, podName string, extraPatches ...PatchOperation) error {
podNameLock, _ := wfc.podNameLocks.LoadOrStore(podName, &gosync.Mutex{})
podNameMutex := podNameLock.(*gosync.Mutex)

podNameMutex.Lock()
defer func() {
podNameMutex.Unlock()
wfc.podNameLocks.Delete(podName)
}()

var patches []PatchOperation
pod, err := wfc.getPodFromAPI(ctx, namespace, podName)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package controller

import (
"context"
gosync "sync"
"testing"
"time"

"k8s.io/apimachinery/pkg/api/resource"

"github.com/argoproj/pkg/sync"
syncpkg "github.com/argoproj/pkg/sync"
"github.com/stretchr/testify/assert"
authorizationv1 "k8s.io/api/authorization/v1"
apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -283,7 +284,7 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl
kubeclientset: kube,
dynamicInterface: dynamicClient,
wfclientset: wfclientset,
workflowKeyLock: sync.NewKeyLock(),
workflowKeyLock: syncpkg.NewKeyLock(),
wfArchive: sqldb.NullWorkflowArchive,
hydrator: hydratorfake.Noop,
estimatorFactory: estimation.DummyEstimatorFactory,
Expand All @@ -293,6 +294,7 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl
progressPatchTickDuration: envutil.LookupEnvDurationOr(common.EnvVarProgressPatchTickDuration, 1*time.Minute),
progressFileTickDuration: envutil.LookupEnvDurationOr(common.EnvVarProgressFileTickDuration, 3*time.Second),
maxStackDepth: maxAllowedStackDepth,
podNameLocks: &gosync.Map{},
}

for _, opt := range options {
Expand Down

0 comments on commit e771bde

Please sign in to comment.