From 7ec5b3ea9e55618f1522dd7e50bbf54baad1ca39 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian <33908564+sarabala1979@users.noreply.github.com> Date: Tue, 27 Jul 2021 11:37:52 -0700 Subject: [PATCH] fix(controller): Same workflow nodes are not executing parallel even semaphore locks available (#6418) * fix(controller): Same workflow nodes are not executing parallel in semaphore --- workflow/controller/operator.go | 2 +- workflow/sync/semaphore.go | 13 ++++++++++++- workflow/sync/semaphore_test.go | 21 +++++++++++++++++++++ 3 files changed, 34 insertions(+), 2 deletions(-) create mode 100644 workflow/sync/semaphore_test.go diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 50cf9aa78377..4c39c629c9e2 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1674,7 +1674,6 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, } else { woc.log.Infof("Node %s acquired synchronization lock", nodeName) if node != nil { - node.Message = "" node = woc.markNodeWaitingForLock(node.Name, "") } } @@ -2215,6 +2214,7 @@ func (woc *wfOperationCtx) markNodeWaitingForLock(nodeName string, lockName stri if lockName == "" { // If we are no longer waiting for a lock, nil out the sync status node.SynchronizationStatus = nil + node.Message = "" } else { node.SynchronizationStatus.Waiting = lockName } diff --git a/workflow/sync/semaphore.go b/workflow/sync/semaphore.go index 7b6055ae8d2d..f842c04d6f70 100644 --- a/workflow/sync/semaphore.go +++ b/workflow/sync/semaphore.go @@ -147,6 +147,17 @@ func (s *PrioritySemaphore) acquire(holderKey string) bool { return false } +func isSameWorkflowNodeKeys(firstKey, secondKey string) bool { + firstItems := strings.Split(firstKey, "/") + secondItems := strings.Split(secondKey, "/") + + if len(firstItems) != len(secondItems) { + return false + } + // compare workflow name + return firstItems[1] == secondItems[1] +} + func (s *PrioritySemaphore) tryAcquire(holderKey string) (bool, string) { s.lock.Lock() defer s.lock.Unlock() @@ -165,7 +176,7 @@ func (s *PrioritySemaphore) tryAcquire(holderKey string) (bool, string) { if s.pending.Len() > 0 { item := s.pending.peek() nextKey = fmt.Sprintf("%v", item.key) - if holderKey != nextKey { + if holderKey != nextKey && !isSameWorkflowNodeKeys(holderKey, nextKey) { // Enqueue the front workflow if lock is available if len(s.lockHolder) < s.limit { s.nextWorkflow(nextKey) diff --git a/workflow/sync/semaphore_test.go b/workflow/sync/semaphore_test.go new file mode 100644 index 000000000000..18bfb2bd5932 --- /dev/null +++ b/workflow/sync/semaphore_test.go @@ -0,0 +1,21 @@ +package sync + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIsSameWorkflowNodeKeys(t *testing.T) { + wfkey1 := "default/wf-1" + wfkey2 := "default/wf-2" + nodeWf1key1 := "default/wf-1/node-1" + nodeWf1key2 := "default/wf-1/node-2" + nodeWf2key1 := "default/wf-2/node-1" + nodeWf2key2 := "default/wf-2/node-2" + assert.True(t, isSameWorkflowNodeKeys(nodeWf1key1, nodeWf1key2)) + assert.True(t, isSameWorkflowNodeKeys(wfkey1, wfkey1)) + assert.False(t, isSameWorkflowNodeKeys(nodeWf1key1, nodeWf2key1)) + assert.False(t, isSameWorkflowNodeKeys(wfkey1, wfkey2)) + assert.True(t, isSameWorkflowNodeKeys(nodeWf2key1, nodeWf2key2)) +}