Skip to content

Commit

Permalink
feat(EventSensor): Dead Letter Queue Trigger (#3199)
Browse files Browse the repository at this point in the history
Signed-off-by: Taleb Zeghmi <talebz@zillowgroup.com>
  • Loading branch information
talebzeghmi authored Jul 12, 2024
1 parent a6b45f1 commit 2451789
Show file tree
Hide file tree
Showing 14 changed files with 614 additions and 324 deletions.
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ debug.test
site/
/go-diagrams/
argo-events
.swo
.swp
# ignore temp vi files
*.swo
*.swp
4 changes: 4 additions & 0 deletions api/jsonschema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -4514,6 +4514,10 @@
"description": "AtLeastOnce determines the trigger execution semantics. Defaults to false. Trigger execution will use at-most-once semantics. If set to true, Trigger execution will switch to at-least-once semantics.",
"type": "boolean"
},
"dlqTrigger": {
"$ref": "#/definitions/io.argoproj.sensor.v1alpha1.Trigger",
"description": "If the trigger fails, it will retry up to the configured number of retries. If the maximum retries are reached and the trigger is set to execute atLeastOnce, the dead letter queue (DLQ) trigger will be invoked if specified. Invoking the dead letter queue trigger helps prevent data loss."
},
"parameters": {
"description": "Parameters is the list of parameters applied to the trigger template definition",
"items": {
Expand Down
4 changes: 4 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 20 additions & 1 deletion api/sensor.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 18 additions & 1 deletion api/sensor.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 36 additions & 7 deletions controllers/sensor/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,52 @@ func validateTriggers(triggers []v1alpha1.Trigger) error {
trigNames := make(map[string]bool)

for _, trigger := range triggers {
if err := validateTriggerTemplate(trigger.Template); err != nil {
if err := validateTrigger(trigger); err != nil {
return err
}
if _, ok := trigNames[trigger.Template.Name]; ok {
return fmt.Errorf("duplicate trigger name: %s", trigger.Template.Name)
}
trigNames[trigger.Template.Name] = true
if err := validateTriggerPolicy(&trigger); err != nil {
return err
}
if err := validateTriggerTemplateParameters(&trigger); err != nil {
return err
}
}
return nil
}

func validateTrigger(trigger v1alpha1.Trigger) error {
if err := validateTriggerTemplate(trigger.Template); err != nil {
return err
}
if err := validateTriggerPolicy(&trigger); err != nil {
return err
}
if err := validateTriggerTemplateParameters(&trigger); err != nil {
return err
}
if err := validateDlqTrigger(&trigger); err != nil {
return err
}

return nil
}

// validateDlqTrigger validates trigger.atLeastOnce==true and the trigger.dlqTrigger
func validateDlqTrigger(trigger *v1alpha1.Trigger) error {
if trigger == nil {
return fmt.Errorf("trigger can't be nil")
}
if trigger.DlqTrigger == nil {
return nil
}
if !trigger.AtLeastOnce {
return fmt.Errorf("to use dlqTrigger, trigger.atLeastOnce must be set to true")
}
if !trigger.DlqTrigger.AtLeastOnce {
return fmt.Errorf("atLeastOnce must be set to true within the dlqTrigger")
}

return validateTrigger(*trigger.DlqTrigger)
}

// validateTriggerTemplate validates trigger template
func validateTriggerTemplate(template *v1alpha1.TriggerTemplate) error {
if template == nil {
Expand Down
80 changes: 80 additions & 0 deletions controllers/sensor/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,86 @@ func TestValidTriggers(t *testing.T) {
assert.Equal(t, true, strings.Contains(err.Error(), "trigger template can't be nil"))
})

t.Run("vanilla dlqTrigger", func(t *testing.T) {
triggers := []v1alpha1.Trigger{
{
Template: &v1alpha1.TriggerTemplate{
Name: "fake-trigger",
K8s: &v1alpha1.StandardK8STrigger{
Operation: "create",
Source: &v1alpha1.ArtifactLocation{},
},
},
AtLeastOnce: true,
DlqTrigger: &v1alpha1.Trigger{
AtLeastOnce: true,
Template: &v1alpha1.TriggerTemplate{
Name: "dlq-fake-trigger",
K8s: &v1alpha1.StandardK8STrigger{
Operation: "create",
Source: &v1alpha1.ArtifactLocation{},
},
},
},
},
}
err := validateTriggers(triggers)
assert.Nil(t, err)
})

t.Run("!dlqTrigger.atLeastOnce", func(t *testing.T) {
triggers := []v1alpha1.Trigger{
{
Template: &v1alpha1.TriggerTemplate{
Name: "fake-trigger",
K8s: &v1alpha1.StandardK8STrigger{
Operation: "create",
Source: &v1alpha1.ArtifactLocation{},
},
},
AtLeastOnce: true,
DlqTrigger: &v1alpha1.Trigger{
Template: &v1alpha1.TriggerTemplate{
Name: "dlq-fake-trigger",
K8s: &v1alpha1.StandardK8STrigger{
Operation: "create",
Source: &v1alpha1.ArtifactLocation{},
},
},
},
},
}
err := validateTriggers(triggers)
assert.NotNil(t, err)
assert.Equal(t, true, strings.Contains(err.Error(), "atLeastOnce must be set to true within the dlqTrigger"))
})

t.Run("dlqTrigger !.atLeastOnce", func(t *testing.T) {
triggers := []v1alpha1.Trigger{
{
Template: &v1alpha1.TriggerTemplate{
Name: "fake-trigger",
K8s: &v1alpha1.StandardK8STrigger{
Operation: "create",
Source: &v1alpha1.ArtifactLocation{},
},
},
DlqTrigger: &v1alpha1.Trigger{
Template: &v1alpha1.TriggerTemplate{
Name: "dlq-fake-trigger",
K8s: &v1alpha1.StandardK8STrigger{
Operation: "create",
Source: &v1alpha1.ArtifactLocation{},
},
},
},
},
}
err := validateTriggers(triggers)
assert.NotNil(t, err)
assert.Equal(t, true, strings.Contains(err.Error(), "to use dlqTrigger, trigger.atLeastOnce must be set to true"))
})

t.Run("invalid conditions reset - cron", func(t *testing.T) {
triggers := []v1alpha1.Trigger{
{
Expand Down
40 changes: 40 additions & 0 deletions docs/sensors/more-about-sensors-and-triggers.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,43 @@ spec:
# Optional
revisionHistoryLimit: 3
```

## Dead Letter Queue Trigger

To help avoid data loss and dropping a message on failure after all the retries are
exhausted, optionally, a `dlqTrigger` may be configured as following to invoke
any of the [10+ triggers](https://argoproj.github.io/argo-events/concepts/trigger/):

```yaml
spec:
triggers:
- template:
name: http-trigger
http:
url: https://xxxxx.com/
method: GET
# must be true for dlqTrigger
atLeastOnce: true
retryStrategy:
steps: 3
dlqTrigger:
template:
name: dlq-http-trigger
http:
url: https://xxxxx.com/
method: PUT
# must be true for dlqTrigger
atLeastOnce: true
# retries the dlqTrigger 5 times
retryStrategy:
steps: 5
```

If the trigger fails, it will retry up to the configured number of retries based
on `retryStrategy`. If the maximum retries are reached and the trigger, the
`dlqTrigger` will be invoked if specified. In order to use the `dlqTrigger`,
the `atLeastOnce` must be set to true within the trigger and the `dlqTrigger` for
the Sensor to know about the failure and invoke the `dlqTrigger`.

**note:** `dlqTrigger` is only available for the top level trigger and not
*recursively within the `dlqTrigger` template.
Loading

0 comments on commit 2451789

Please sign in to comment.