Skip to content

Commit

Permalink
fix(controller): Same workflow nodes are not executing parallel even …
Browse files Browse the repository at this point in the history
…semaphore locks available (#6418)

* fix(controller): Same workflow nodes are not executing parallel in semaphore
  • Loading branch information
sarabala1979 authored Jul 27, 2021
1 parent c29b275 commit 7ec5b3e
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 2 deletions.
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1674,7 +1674,6 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
} else {
woc.log.Infof("Node %s acquired synchronization lock", nodeName)
if node != nil {
node.Message = ""
node = woc.markNodeWaitingForLock(node.Name, "")
}
}
Expand Down Expand Up @@ -2215,6 +2214,7 @@ func (woc *wfOperationCtx) markNodeWaitingForLock(nodeName string, lockName stri
if lockName == "" {
// If we are no longer waiting for a lock, nil out the sync status
node.SynchronizationStatus = nil
node.Message = ""
} else {
node.SynchronizationStatus.Waiting = lockName
}
Expand Down
13 changes: 12 additions & 1 deletion workflow/sync/semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ func (s *PrioritySemaphore) acquire(holderKey string) bool {
return false
}

func isSameWorkflowNodeKeys(firstKey, secondKey string) bool {
firstItems := strings.Split(firstKey, "/")
secondItems := strings.Split(secondKey, "/")

if len(firstItems) != len(secondItems) {
return false
}
// compare workflow name
return firstItems[1] == secondItems[1]
}

func (s *PrioritySemaphore) tryAcquire(holderKey string) (bool, string) {
s.lock.Lock()
defer s.lock.Unlock()
Expand All @@ -165,7 +176,7 @@ func (s *PrioritySemaphore) tryAcquire(holderKey string) (bool, string) {
if s.pending.Len() > 0 {
item := s.pending.peek()
nextKey = fmt.Sprintf("%v", item.key)
if holderKey != nextKey {
if holderKey != nextKey && !isSameWorkflowNodeKeys(holderKey, nextKey) {
// Enqueue the front workflow if lock is available
if len(s.lockHolder) < s.limit {
s.nextWorkflow(nextKey)
Expand Down
21 changes: 21 additions & 0 deletions workflow/sync/semaphore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package sync

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestIsSameWorkflowNodeKeys(t *testing.T) {
wfkey1 := "default/wf-1"
wfkey2 := "default/wf-2"
nodeWf1key1 := "default/wf-1/node-1"
nodeWf1key2 := "default/wf-1/node-2"
nodeWf2key1 := "default/wf-2/node-1"
nodeWf2key2 := "default/wf-2/node-2"
assert.True(t, isSameWorkflowNodeKeys(nodeWf1key1, nodeWf1key2))
assert.True(t, isSameWorkflowNodeKeys(wfkey1, wfkey1))
assert.False(t, isSameWorkflowNodeKeys(nodeWf1key1, nodeWf2key1))
assert.False(t, isSameWorkflowNodeKeys(wfkey1, wfkey2))
assert.True(t, isSameWorkflowNodeKeys(nodeWf2key1, nodeWf2key2))
}

0 comments on commit 7ec5b3e

Please sign in to comment.