Skip to content

Commit

Permalink
feat(cli): Retry workflows by label selector and field selector (#5795)
Browse files Browse the repository at this point in the history
  • Loading branch information
dinever authored May 3, 2021
1 parent 8f2acee commit b9a79e0
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 31 deletions.
123 changes: 95 additions & 28 deletions cmd/argo/commands/retry.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,41 @@
package commands

import (
"log"
"context"
"fmt"
"os"

"github.com/argoproj/pkg/errors"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"

"github.com/argoproj/pkg/errors"

"github.com/argoproj/argo-workflows/v3/cmd/argo/commands/client"
workflowpkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)

type retryOps struct {
nodeFieldSelector string // --node-field-selector
restartSuccessful bool // --restart-successful
namespace string // --namespace
labelSelector string // --selector
fieldSelector string // --field-selector
}

// hasSelector returns true if the CLI arguments selects multiple workflows
func (o *retryOps) hasSelector() bool {
if o.labelSelector != "" || o.fieldSelector != "" {
return true
}
return false
}

func NewRetryCommand() *cobra.Command {
var (
cliSubmitOpts cliSubmitOpts
retryOps retryOps
retryOpts retryOps
)
command := &cobra.Command{
Use: "retry [WORKFLOW...]",
Expand All @@ -28,10 +44,18 @@ func NewRetryCommand() *cobra.Command {
argo retry my-wf
# Retry several workflows:
# Retry multiple workflows:
argo retry my-wf my-other-wf my-third-wf
# Retry multiple workflows by label selector:
argo retry -l workflows.argoproj.io/test=true
# Retry multiple workflows by field selector:
argo retry --field-selector metadata.namespace=argo
# Retry and wait for completion:
argo retry --wait my-wf.yaml
Expand All @@ -49,36 +73,79 @@ func NewRetryCommand() *cobra.Command {
argo retry @latest
`,
Run: func(cmd *cobra.Command, args []string) {
if len(args) == 0 && !retryOpts.hasSelector() {
cmd.HelpFunc()(cmd, args)
os.Exit(1)
}
ctx, apiClient := client.NewAPIClient()
serviceClient := apiClient.NewWorkflowServiceClient()
namespace := client.Namespace()
retryOpts.namespace = client.Namespace()

selector, err := fields.ParseSelector(retryOps.nodeFieldSelector)
if err != nil {
log.Fatalf("Unable to parse node field selector '%s': %s", retryOps.nodeFieldSelector, err)
}

for _, name := range args {
wf, err := serviceClient.RetryWorkflow(ctx, &workflowpkg.WorkflowRetryRequest{
Name: name,
Namespace: namespace,
RestartSuccessful: retryOps.restartSuccessful,
NodeFieldSelector: selector.String(),
})
if err != nil {
errors.CheckError(err)
return
}
printWorkflow(wf, getFlags{output: cliSubmitOpts.output})
waitWatchOrLog(ctx, serviceClient, namespace, []string{name}, cliSubmitOpts)
}
err := retryWorkflows(ctx, serviceClient, retryOpts, cliSubmitOpts, args)
errors.CheckError(err)
},
}
command.Flags().StringVarP(&cliSubmitOpts.output, "output", "o", "", "Output format. One of: name|json|yaml|wide")
command.Flags().BoolVarP(&cliSubmitOpts.wait, "wait", "w", false, "wait for the workflow to complete")
command.Flags().BoolVar(&cliSubmitOpts.watch, "watch", false, "watch the workflow until it completes")
command.Flags().BoolVarP(&cliSubmitOpts.wait, "wait", "w", false, "wait for the workflow to complete, only works when a single workflow is retried")
command.Flags().BoolVar(&cliSubmitOpts.watch, "watch", false, "watch the workflow until it completes, only works when a single workflow is retried")
command.Flags().BoolVar(&cliSubmitOpts.log, "log", false, "log the workflow until it completes")
command.Flags().BoolVar(&retryOps.restartSuccessful, "restart-successful", false, "indicates to restart successful nodes matching the --node-field-selector")
command.Flags().StringVar(&retryOps.nodeFieldSelector, "node-field-selector", "", "selector of nodes to reset, eg: --node-field-selector inputs.paramaters.myparam.value=abc")
command.Flags().BoolVar(&retryOpts.restartSuccessful, "restart-successful", false, "indicates to restart successful nodes matching the --node-field-selector")
command.Flags().StringVar(&retryOpts.nodeFieldSelector, "node-field-selector", "", "selector of nodes to reset, eg: --node-field-selector inputs.paramaters.myparam.value=abc")
command.Flags().StringVarP(&retryOpts.labelSelector, "selector", "l", "", "Selector (label query) to filter on, not including uninitialized ones, supports '=', '==', and '!='.(e.g. -l key1=value1,key2=value2)")
command.Flags().StringVar(&retryOpts.fieldSelector, "field-selector", "", "Selector (field query) to filter on, supports '=', '==', and '!='.(e.g. --field-selector key1=value1,key2=value2). The server only supports a limited number of field queries per type.")
return command
}

// retryWorkflows retries workflows by given retryArgs or workflow names
func retryWorkflows(ctx context.Context, serviceClient workflowpkg.WorkflowServiceClient, retryOpts retryOps, cliSubmitOpts cliSubmitOpts, args []string) error {
selector, err := fields.ParseSelector(retryOpts.nodeFieldSelector)
if err != nil {
return fmt.Errorf("unable to parse node field selector '%s': %s", retryOpts.nodeFieldSelector, err)
}
var wfs wfv1.Workflows
if retryOpts.hasSelector() {
wfs, err = listWorkflows(ctx, serviceClient, listFlags{
namespace: retryOpts.namespace,
fields: retryOpts.fieldSelector,
labels: retryOpts.labelSelector,
})
if err != nil {
return err
}
}

for _, n := range args {
wfs = append(wfs, wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{
Name: n,
Namespace: retryOpts.namespace,
},
})
}

var lastRetried *wfv1.Workflow
retriedNames := make(map[string]bool)
for _, wf := range wfs {
if _, ok := retriedNames[wf.Name]; ok {
// de-duplication in case there is an overlap between the selector and given workflow names
continue
}
retriedNames[wf.Name] = true

lastRetried, err = serviceClient.RetryWorkflow(ctx, &workflowpkg.WorkflowRetryRequest{
Name: wf.Name,
Namespace: wf.Namespace,
RestartSuccessful: retryOpts.restartSuccessful,
NodeFieldSelector: selector.String(),
})
if err != nil {
return err
}
printWorkflow(lastRetried, getFlags{output: cliSubmitOpts.output})
}
if len(retriedNames) == 1 {
// watch or wait when there is only one workflow retried
waitWatchOrLog(ctx, serviceClient, lastRetried.Namespace, []string{lastRetried.Name}, cliSubmitOpts)
}
return nil
}
146 changes: 146 additions & 0 deletions cmd/argo/commands/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package commands

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

workflowpkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow"
workflowmocks "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow/mocks"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)

func Test_retryWorkflows(t *testing.T) {
t.Run("Retry workflow by names", func(t *testing.T) {
c := &workflowmocks.WorkflowServiceClient{}
retryOpts := retryOps{
namespace: "argo",
}
cliSubmitOpts := cliSubmitOpts{}

c.On("RetryWorkflow", mock.Anything, mock.Anything).Return(&wfv1.Workflow{}, nil)

err := retryWorkflows(context.Background(), c, retryOpts, cliSubmitOpts, []string{"foo", "bar"})
c.AssertNumberOfCalls(t, "RetryWorkflow", 2)

assert.NoError(t, err)
})

t.Run("Retry workflow by selector", func(t *testing.T) {
c := &workflowmocks.WorkflowServiceClient{}
retryOpts := retryOps{
namespace: "argo",
labelSelector: "custom-label=true",
}
cliSubmitOpts := cliSubmitOpts{}

wfListReq := &workflowpkg.WorkflowListRequest{
Namespace: "argo",
ListOptions: &metav1.ListOptions{
LabelSelector: retryOpts.labelSelector,
},
}

wfList := &wfv1.WorkflowList{Items: wfv1.Workflows{
{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "argo"}},
{ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "argo"}},
{ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "argo"}},
}}

c.On("ListWorkflows", mock.Anything, wfListReq).Return(wfList, nil)
c.On("RetryWorkflow", mock.Anything, mock.Anything).Return(&wfv1.Workflow{}, nil)

err := retryWorkflows(context.Background(), c, retryOpts, cliSubmitOpts, []string{})

c.AssertNumberOfCalls(t, "RetryWorkflow", 3)
for _, wf := range wfList.Items {
retryReq := &workflowpkg.WorkflowRetryRequest{
Name: wf.Name,
Namespace: wf.Namespace,
RestartSuccessful: retryOpts.restartSuccessful,
NodeFieldSelector: "",
}
c.AssertCalled(t, "RetryWorkflow", mock.Anything, retryReq)
}

assert.NoError(t, err)
})

t.Run("Retry workflow by selector and name", func(t *testing.T) {
c := &workflowmocks.WorkflowServiceClient{}
retryOpts := retryOps{
namespace: "argo",
labelSelector: "custom-label=true",
}
cliSubmitOpts := cliSubmitOpts{}

wfListReq := &workflowpkg.WorkflowListRequest{
Namespace: "argo",
ListOptions: &metav1.ListOptions{
LabelSelector: retryOpts.labelSelector,
},
}

wfList := &wfv1.WorkflowList{Items: wfv1.Workflows{
{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
{ObjectMeta: metav1.ObjectMeta{Name: "bar"}},
{ObjectMeta: metav1.ObjectMeta{Name: "baz"}},
}}

c.On("ListWorkflows", mock.Anything, wfListReq).Return(wfList, nil)

c.On("RetryWorkflow", mock.Anything, mock.Anything).Return(&wfv1.Workflow{}, nil)

err := retryWorkflows(context.Background(), c, retryOpts, cliSubmitOpts, []string{"foo", "qux"})
// after de-duplication, there will be 4 workflows to retry
c.AssertNumberOfCalls(t, "RetryWorkflow", 4)

// the 3 workflows from the selectors: "foo", "bar", "baz"
for _, wf := range wfList.Items {
retryReq := &workflowpkg.WorkflowRetryRequest{
Name: wf.Name,
Namespace: wf.Namespace,
RestartSuccessful: retryOpts.restartSuccessful,
NodeFieldSelector: "",
}
c.AssertCalled(t, "RetryWorkflow", mock.Anything, retryReq)
}

// the 1 workflow by the given name "qux
c.AssertCalled(t, "RetryWorkflow", mock.Anything, &workflowpkg.WorkflowRetryRequest{
Name: "qux",
Namespace: "argo",
RestartSuccessful: retryOpts.restartSuccessful,
NodeFieldSelector: "",
})

assert.NoError(t, err)
})

t.Run("Retry workflow list error", func(t *testing.T) {
c := &workflowmocks.WorkflowServiceClient{}
retryOpts := retryOps{
namespace: "argo",
labelSelector: "custom-label=true",
}
cliSubmitOpts := cliSubmitOpts{}
c.On("ListWorkflows", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error"))
err := retryWorkflows(context.Background(), c, retryOpts, cliSubmitOpts, []string{})
assert.Errorf(t, err, "mock error")
})

t.Run("Retry workflow error", func(t *testing.T) {
c := &workflowmocks.WorkflowServiceClient{}
retryOpts := retryOps{
namespace: "argo",
}
cliSubmitOpts := cliSubmitOpts{}
c.On("RetryWorkflow", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error"))
err := retryWorkflows(context.Background(), c, retryOpts, cliSubmitOpts, []string{"foo"})
assert.Errorf(t, err, "mock error")
})
}
16 changes: 13 additions & 3 deletions docs/cli/argo_retry.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,18 @@ argo retry [WORKFLOW...] [flags]
argo retry my-wf
# Retry several workflows:
# Retry multiple workflows:
argo retry my-wf my-other-wf my-third-wf
# Retry multiple workflows by label selector:
argo retry -l workflows.argoproj.io/test=true
# Retry multiple workflows by field selector:
argo retry --field-selector metadata.namespace=argo
# Retry and wait for completion:
argo retry --wait my-wf.yaml
Expand All @@ -42,13 +50,15 @@ argo retry [WORKFLOW...] [flags]
### Options

```
--field-selector string Selector (field query) to filter on, supports '=', '==', and '!='.(e.g. --field-selector key1=value1,key2=value2). The server only supports a limited number of field queries per type.
-h, --help help for retry
--log log the workflow until it completes
--node-field-selector string selector of nodes to reset, eg: --node-field-selector inputs.paramaters.myparam.value=abc
-o, --output string Output format. One of: name|json|yaml|wide
--restart-successful indicates to restart successful nodes matching the --node-field-selector
-w, --wait wait for the workflow to complete
--watch watch the workflow until it completes
-l, --selector string Selector (label query) to filter on, not including uninitialized ones, supports '=', '==', and '!='.(e.g. -l key1=value1,key2=value2)
-w, --wait wait for the workflow to complete, only works when a single workflow is retried
--watch watch the workflow until it completes, only works when a single workflow is retried
```

### Options inherited from parent commands
Expand Down

0 comments on commit b9a79e0

Please sign in to comment.