Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Synchronization lock handling in Step/DAG Template level #5081

Merged
merged 9 commits into from
Feb 12, 2021
10 changes: 10 additions & 0 deletions docs/fields.md
Original file line number Diff line number Diff line change
Expand Up @@ -1640,6 +1640,8 @@ Synchronization holds synchronization lock configuration
- [`synchronization-tmpl-level.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/synchronization-tmpl-level.yaml)

- [`synchronization-wf-level.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/synchronization-wf-level.yaml)

- [`templates.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/workflow-template/templates.yaml)
</details>

### Fields
Expand Down Expand Up @@ -2173,6 +2175,8 @@ SynchronizationStatus stores the status of semaphore and mutex.
- [`synchronization-tmpl-level.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/synchronization-tmpl-level.yaml)

- [`synchronization-wf-level.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/synchronization-wf-level.yaml)

- [`templates.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/workflow-template/templates.yaml)
</details>

### Fields
Expand Down Expand Up @@ -2507,6 +2511,8 @@ SemaphoreRef is a reference of Semaphore
- [`synchronization-tmpl-level.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/synchronization-tmpl-level.yaml)

- [`synchronization-wf-level.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/synchronization-wf-level.yaml)

- [`templates.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/workflow-template/templates.yaml)
</details>

### Fields
Expand Down Expand Up @@ -3192,6 +3198,8 @@ _No description available_
- [`synchronization-tmpl-level.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/synchronization-tmpl-level.yaml)

- [`synchronization-wf-level.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/synchronization-wf-level.yaml)

- [`templates.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/workflow-template/templates.yaml)
</details>

### Fields
Expand Down Expand Up @@ -4652,6 +4660,8 @@ Selects a key from a ConfigMap.
- [`synchronization-tmpl-level.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/synchronization-tmpl-level.yaml)

- [`synchronization-wf-level.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/synchronization-wf-level.yaml)

- [`templates.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/workflow-template/templates.yaml)
</details>

### Fields
Expand Down
5 changes: 5 additions & 0 deletions examples/workflow-template/templates.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ spec:
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
- name: inner-steps
synchronization:
semaphore:
configMapKeyRef:
name: my-config
key: template
steps:
- - name: inner-hello1
templateRef:
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex
}

// Release acquired lock completed task.
if tmpl != nil && tmpl.Synchronization != nil {
if tmpl != nil {
woc.controller.syncManager.Release(woc.wf, node.ID, tmpl.Synchronization)
}

Expand Down
13 changes: 7 additions & 6 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1610,9 +1610,8 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,

if node != nil {
if node.Fulfilled() {
if processedTmpl.Synchronization != nil {
woc.controller.syncManager.Release(woc.wf, node.ID, processedTmpl.Synchronization)
}
woc.controller.syncManager.Release(woc.wf, node.ID, processedTmpl.Synchronization)

woc.log.Debugf("Node %s already completed", nodeName)
if processedTmpl.Metrics != nil {
// Check if this node completed between executions. If it did, emit metrics. If a node completes within
Expand Down Expand Up @@ -1770,9 +1769,7 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
if err != nil {
node = woc.markNodeError(nodeName, err)

if processedTmpl.Synchronization != nil {
woc.controller.syncManager.Release(woc.wf, node.ID, processedTmpl.Synchronization)
}
woc.controller.syncManager.Release(woc.wf, node.ID, processedTmpl.Synchronization)

// If retry policy is not set, or if it is not set to Always or OnError, we won't attempt to retry an errored container
// and we return instead.
Expand All @@ -1785,6 +1782,10 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
}
}

if node.Fulfilled() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the right place to check the node status and release the lock.

woc.controller.syncManager.Release(woc.wf, node.ID, processedTmpl.Synchronization)
}

if processedTmpl.Metrics != nil {
// Check if the node was just created, if it was emit realtime metrics.
// If the node did not previously exist, we can infer that it was created during the current operation, emit real time metrics.
Expand Down
213 changes: 213 additions & 0 deletions workflow/controller/operator_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ metadata:
data:
workflow: "2"
template: "1"
step: "1"
`
const wfWithSemaphore = `
apiVersion: argoproj.io/v1alpha1
Expand Down Expand Up @@ -486,3 +487,215 @@ func TestSynchronizationWithRetry(t *testing.T) {

})
}

const StepWithSync = `
sarabala1979 marked this conversation as resolved.
Show resolved Hide resolved
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: steps-jklcl
namespace: default
spec:
entrypoint: hello-hello-hello
templates:
- arguments: {}
name: hello-hello-hello
steps:
- - arguments:
parameters:
- name: message
value: hello1
name: hello1
template: whalesay
synchronization:
semaphore:
configMapKeyRef:
key: step
name: my-config
- arguments: {}
container:
args:
- '{{inputs.parameters.message}}'
command:
- cowsay
image: docker/whalesay
inputs:
parameters:
- name: message
name: whalesay
`

const StepWithSyncStatus = `
piVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: steps-jklcl
namespace: default
spec:
entrypoint: hello-hello-hello
templates:
- inputs: {}
name: hello-hello-hello
steps:
- - arguments:
parameters:
- name: message
value: hello1
name: hello1
template: whalesay
synchronization:
semaphore:
configMapKeyRef:
key: step
name: my-config
- container:
args:
- '{{inputs.parameters.message}}'
command:
- cowsay
image: docker/whalesay
resources: {}
inputs:
parameters:
- name: message
name: whalesay
status:
artifactRepositoryRef:
configMap: artifact-repositories
key: default-v1
namespace: argo
conditions:
- status: "False"
type: PodRunning
- status: "True"
type: Completed
finishedAt: "2021-02-11T19:46:55Z"
nodes:
steps-jklcl:
children:
- steps-jklcl-3895081407
displayName: steps-jklcl
finishedAt: "2021-02-11T19:46:55Z"
id: steps-jklcl
name: steps-jklcl
outboundNodes:
- steps-jklcl-969694128
phase: Running
progress: 1/1
resourcesDuration:
cpu: 7
memory: 4
startedAt: "2021-02-11T19:46:33Z"
templateName: hello-hello-hello
templateScope: local/steps-jklcl
type: Steps
steps-jklcl-969694128:
boundaryID: steps-jklcl
displayName: hello1
finishedAt: "2021-02-11T19:46:44Z"
id: steps-jklcl-969694128
inputs:
parameters:
- name: message
value: hello1
name: steps-jklcl[0].hello1
outputs:
artifacts:
- archiveLogs: true
name: main-logs
s3:
accessKeySecret:
key: accesskey
name: my-minio-cred
bucket: my-bucket
endpoint: minio:9000
insecure: true
key: steps-jklcl/steps-jklcl-969694128/main.log
secretKeySecret:
key: secretkey
name: my-minio-cred
exitCode: "0"
phase: Succeeded
progress: 1/1
resourcesDuration:
cpu: 7
memory: 4
startedAt: "2021-02-11T19:46:33Z"
templateName: whalesay
templateScope: local/steps-jklcl
type: Pod
steps-jklcl-3895081407:
boundaryID: steps-jklcl
children:
- steps-jklcl-969694128
displayName: '[0]'
finishedAt: "2021-02-11T19:46:55Z"
id: steps-jklcl-3895081407
name: steps-jklcl[0]
phase: Succeeded
progress: 1/1
resourcesDuration:
cpu: 7
memory: 4
startedAt: "2021-02-11T19:46:33Z"
templateScope: local/steps-jklcl
type: StepGroup
phase: Succeeded
progress: 1/1
resourcesDuration:
cpu: 7
memory: 4
startedAt: "2021-02-11T19:46:33Z"

`

func TestSynchronizationWithStep(t *testing.T) {
assert := assert.New(t)
cancel, controller := newController()
defer cancel()
ctx := context.Background()
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), func(key string) {
}, workflowExistenceFunc)
var cm v1.ConfigMap
err := yaml.Unmarshal([]byte(configMap), &cm)
assert.NoError(err)
_, err = controller.kubeclientset.CoreV1().ConfigMaps("default").Create(ctx, &cm, metav1.CreateOptions{})
assert.NoError(err)

t.Run("StepWithSychronization", func(t *testing.T) {
//First workflow Acquire the lock
wf := unmarshalWF(StepWithSync)
wf, err := controller.wfclientset.ArgoprojV1alpha1().Workflows("default").Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(err)
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
assert.NotNil(woc.wf.Status.Synchronization)
assert.NotNil(woc.wf.Status.Synchronization.Semaphore)
assert.Len(woc.wf.Status.Synchronization.Semaphore.Holding, 1)

// Second workflow try to acquire the lock and wait for lock
wf1 := unmarshalWF(StepWithSync)
wf1.Name = "step2"
wf1, err = controller.wfclientset.ArgoprojV1alpha1().Workflows("default").Create(ctx, wf1, metav1.CreateOptions{})
assert.NoError(err)
woc1 := newWorkflowOperationCtx(wf1, controller)
woc1.operate(ctx)
assert.NotNil(woc1.wf.Status.Synchronization)
assert.NotNil(woc1.wf.Status.Synchronization.Semaphore)
assert.Nil(woc1.wf.Status.Synchronization.Semaphore.Holding)
assert.Len(woc1.wf.Status.Synchronization.Semaphore.Waiting, 1)

//Finished all StepGroup in step
wf = unmarshalWF(StepWithSyncStatus)
woc = newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
assert.Nil(woc.wf.Status.Synchronization)

// Second workflow acquire the lock
woc1 = newWorkflowOperationCtx(woc1.wf, controller)
woc1.operate(ctx)
assert.NotNil(woc1.wf.Status.Synchronization)
assert.NotNil(woc1.wf.Status.Synchronization.Semaphore)
assert.NotNil(woc1.wf.Status.Synchronization.Semaphore.Holding)
assert.Len(woc1.wf.Status.Synchronization.Semaphore.Holding, 1)
})
}
7 changes: 2 additions & 5 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,7 @@ func (woc *wfOperationCtx) executeSteps(ctx context.Context, nodeName string, tm

sgNode := woc.executeStepGroup(ctx, stepGroup.Steps, sgNodeName, &stepsCtx)

if sgNode.Fulfilled() {
if tmpl.Synchronization != nil {
woc.controller.syncManager.Release(woc.wf, node.ID, tmpl.Synchronization)
}
} else {
if !sgNode.Fulfilled() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bug. Code is releasing parent lock if child completes

woc.log.Infof("Workflow step group node %s not yet completed", sgNode.ID)
return node, nil
}
Expand Down Expand Up @@ -147,6 +143,7 @@ func (woc *wfOperationCtx) executeSteps(ctx context.Context, nodeName string, tm
}
}
}

woc.updateOutboundNodes(nodeName, tmpl)
// If this template has outputs from any of its steps, copy them to this node here
outputs, err := getTemplateOutputsFromScope(tmpl, stepsCtx.scope)
Expand Down
8 changes: 4 additions & 4 deletions workflow/sync/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (cm *Manager) getWorkflowKey(key string) (string, error) {
}

func (cm *Manager) CheckWorkflowExistence() {
log.Infof("Check the workflow existence")
log.Debug("Check the workflow existence")
for _, lock := range cm.syncLockMap {
keys := lock.getCurrentHolders()
keys = append(keys, lock.getCurrentPending()...)
Expand Down Expand Up @@ -173,13 +173,13 @@ func (cm *Manager) TryAcquire(wf *wfv1.Workflow, nodeName string, syncLockRef *w
}

func (cm *Manager) Release(wf *wfv1.Workflow, nodeName string, syncRef *wfv1.Synchronization) {
cm.lock.Lock()
defer cm.lock.Unlock()

if syncRef == nil {
return
}

cm.lock.Lock()
defer cm.lock.Unlock()

holderKey := getHolderKey(wf, nodeName)
lockName, err := GetLockName(syncRef, wf.Namespace)
if err != nil {
Expand Down