Skip to content

Add if/then/else support to processors #10744

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add `overwrite` and `check_exists` settings to ILM support. {pull}10347[10347]
- Generate Kibana index pattern on demand instead of using a local file. {pull}10478[10478]
- Calls to Elasticsearch X-Pack APIs made by Beats won't cause deprecation logs in Elasticsearch logs. {9656}9656[9656]
- Add if/then/else support to processors. {pull}10744[10744]

*Auditbeat*

Expand Down
2 changes: 1 addition & 1 deletion libbeat/conditions/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func NewCondition(config *Config) (Condition, error) {
condition, err = NewNotCondition(inner)
}
default:
err = errors.New("missing condition")
err = errors.New("missing or invalid condition")
}
if err != nil {
return nil, err
Expand Down
26 changes: 26 additions & 0 deletions libbeat/docs/processors-using.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,32 @@ condition is present, then the action is executed only if the condition is
fulfilled. If no condition is passed, then the action is always executed.
* `<parameters>` is the list of parameters to pass to the processor.

More complex conditional processing can be accomplished by using the
if-then-else processor configuration. This allows multiple processors to be
executed based on a single condition.

[source,yaml]
----
processors:
- if:
<condition>
then: <1>
- <processor_name>:
<parameters>
- <processor_name>:
<parameters>
...
else: <2>
- <processor_name>:
<parameters>
- <processor_name>:
<parameters>
...
----
<1> `then` must contain a single processor or a list of one or more processors
to execute when the condition evaluates to true.
<2> `else` is optional. It can contain a single processor or a list of
processors to execute when the conditional evaluate to false.

[[where-valid]]
==== Where are processors valid?
Expand Down
13 changes: 6 additions & 7 deletions libbeat/processors/actions/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,19 @@ func testProcessors(t *testing.T, cases map[string]testCase) {
t.Fatalf("Failed to create config(%v): %+v", i, err)
}

var plugin map[string]*common.Config
if err := config.Unpack(&plugin); err != nil {
t.Fatalf("Failed to unpack config: %+v", err)
}

ps[i], err = processors.New(processors.PluginConfig{plugin})
ps[i], err = processors.New([]*common.Config{config})
if err != nil {
t.Fatalf("Failed to create add_tags processor(%v): %+v", i, err)
}
}

current := &beat.Event{Fields: test.event.Clone()}
for i, processor := range ps {
current = processor.Run(current)
var err error
current, err = processor.Run(current)
if err != nil {
t.Fatal(err)
}
if current == nil {
t.Fatalf("Event dropped(%v)", i)
}
Expand Down
95 changes: 86 additions & 9 deletions libbeat/processors/conditionals.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,15 @@ package processors

import (
"fmt"
"strings"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/conditions"
"github.com/elastic/beats/libbeat/logp"
)

// WhenProcessor is a tuple of condition plus a Processor.
type WhenProcessor struct {
condition conditions.Condition
p Processor
}

// NewConditional returns a constructor suitable for registering when conditionals as a plugin.
func NewConditional(
ruleFactory Constructor,
Expand Down Expand Up @@ -60,15 +56,20 @@ func NewConditionList(config []conditions.Config) ([]conditions.Condition, error
return out, nil
}

// WhenProcessor is a tuple of condition plus a Processor.
type WhenProcessor struct {
condition conditions.Condition
p Processor
}

// NewConditionRule returns a processor that will execute the provided processor if the condition is true.
func NewConditionRule(
config conditions.Config,
p Processor,
) (Processor, error) {
cond, err := conditions.NewCondition(&config)
if err != nil {
logp.Err("Failed to initialize lookup condition: %v", err)
return nil, err
return nil, errors.Wrap(err, "failed to initialize condition")
}

if cond == nil {
Expand Down Expand Up @@ -108,3 +109,79 @@ func addCondition(

return NewConditionRule(condConfig, p)
}

type ifThenElseConfig struct {
Cond conditions.Config `config:"if" validate:"required"`
Then *common.Config `config:"then" validate:"required"`
Else *common.Config `config:"else"`
}

// IfThenElseProcessor executes one set of processors (then) if the condition is
// true and another set of processors (else) if the condition is false.
type IfThenElseProcessor struct {
cond conditions.Condition
then *Processors
els *Processors
}

// NewIfElseThenProcessor construct a new IfThenElseProcessor.
func NewIfElseThenProcessor(cfg *common.Config) (*IfThenElseProcessor, error) {
var config ifThenElseConfig
if err := cfg.Unpack(&config); err != nil {
return nil, err
}

cond, err := conditions.NewCondition(&config.Cond)
if err != nil {
return nil, err
}

newProcessors := func(c *common.Config) (*Processors, error) {
if c == nil {
return nil, nil
}
if !c.IsArray() {
return New([]*common.Config{c})
}

var pc PluginConfig
if err := c.Unpack(&pc); err != nil {
return nil, err
}
return New(pc)
}

var ifProcessors, elseProcessors *Processors
if ifProcessors, err = newProcessors(config.Then); err != nil {
return nil, err
}
if elseProcessors, err = newProcessors(config.Else); err != nil {
return nil, err
}

return &IfThenElseProcessor{cond, ifProcessors, elseProcessors}, nil
}

// Run checks the if condition and executes the processors attached to the
// then statement or the else statement based on the condition.
func (p *IfThenElseProcessor) Run(event *beat.Event) (*beat.Event, error) {
if p.cond.Check(event) {
return p.then.Run(event)
} else if p.els != nil {
return p.els.Run(event)
}
return event, nil
}

func (p *IfThenElseProcessor) String() string {
var sb strings.Builder
sb.WriteString("if ")
sb.WriteString(p.cond.String())
sb.WriteString(" then ")
sb.WriteString(p.then.String())
if p.els != nil {
sb.WriteString(" else ")
sb.WriteString(p.els.String())
}
return sb.String()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this, I see that we do a nil check for the else branch, but looking at the code the then branch could be also nil.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the config is Unpacked the then field is is validated via validate:"required". This prevents it from ever being nil.

108 changes: 108 additions & 0 deletions libbeat/processors/conditionals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,111 @@ func TestConditionRuleInitErrorPropagates(t *testing.T) {
assert.Equal(t, testErr, err)
assert.Nil(t, filter)
}

type testCase struct {
event common.MapStr
want common.MapStr
cfg string
}

func testProcessors(t *testing.T, cases map[string]testCase) {
for name, test := range cases {
test := test
t.Run(name, func(t *testing.T) {
c, err := common.NewConfigWithYAML([]byte(test.cfg), "test "+name)
if err != nil {
t.Fatal(err)
}

var pluginConfig PluginConfig
if err = c.Unpack(&pluginConfig); err != nil {
t.Fatal(err)
}

processor, err := New(pluginConfig)
if err != nil {
t.Fatal(err)
}

result, err := processor.Run(&beat.Event{Fields: test.event.Clone()})
if err != nil {
t.Fatal(err)
}
assert.Equal(t, test.want, result.Fields)
})
}
}

func TestIfElseThenProcessor(t *testing.T) {
const ifThen = `
- if:
range.uid.lt: 500
then:
- add_fields: {target: "", fields: {uid_type: reserved}}
`

const ifThenElse = `
- if:
range.uid.lt: 500
then:
- add_fields: {target: "", fields: {uid_type: reserved}}
else:
- add_fields: {target: "", fields: {uid_type: user}}
`

const ifThenElseSingleProcessor = `
- if:
range.uid.lt: 500
then:
add_fields: {target: "", fields: {uid_type: reserved}}
else:
add_fields: {target: "", fields: {uid_type: user}}
`

const ifThenElseIf = `
- if:
range.uid.lt: 500
then:
- add_fields: {target: "", fields: {uid_type: reserved}}
else:
if:
equals.uid: 500
then:
add_fields: {target: "", fields: {uid_type: "eq_500"}}
else:
add_fields: {target: "", fields: {uid_type: "gt_500"}}
`

testProcessors(t, map[string]testCase{
"if-then-true": {
event: common.MapStr{"uid": 411},
want: common.MapStr{"uid": 411, "uid_type": "reserved"},
cfg: ifThen,
},
"if-then-false": {
event: common.MapStr{"uid": 500},
want: common.MapStr{"uid": 500},
cfg: ifThen,
},
"if-then-else-true": {
event: common.MapStr{"uid": 411},
want: common.MapStr{"uid": 411, "uid_type": "reserved"},
cfg: ifThenElse,
},
"if-then-else-false": {
event: common.MapStr{"uid": 500},
want: common.MapStr{"uid": 500, "uid_type": "user"},
cfg: ifThenElse,
},
"if-then-else-false-single-processor": {
event: common.MapStr{"uid": 500},
want: common.MapStr{"uid": 500, "uid_type": "user"},
cfg: ifThenElseSingleProcessor,
},
"if-then-else-if": {
event: common.MapStr{"uid": 500},
want: common.MapStr{"uid": 500, "uid_type": "eq_500"},
cfg: ifThenElseIf,
},
})
}
3 changes: 2 additions & 1 deletion libbeat/processors/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
"github.com/elastic/beats/libbeat/common"
)

type PluginConfig []map[string]*common.Config
// PluginConfig represents the list of processors.
type PluginConfig []*common.Config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the comment maybe Processors or List might be a better name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to change it because it's public and used in multiple places in the code. But I agree the name isn't that great.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah its public :(


// fields that should be always exported
var MandatoryExportedFields = []string{"type"}
13 changes: 7 additions & 6 deletions libbeat/processors/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package processors

import (
"errors"
"fmt"
"strings"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
)

Expand Down Expand Up @@ -58,7 +59,7 @@ func (ns *Namespace) add(names []string, p pluginer) error {
// register plugin if intermediate node in path being processed
if len(names) == 1 {
if _, found := ns.reg[name]; found {
return errors.New("exists already")
return errors.Errorf("%v exists already", name)
}

ns.reg[name] = p
Expand Down Expand Up @@ -94,20 +95,20 @@ func (ns *Namespace) Plugin() Constructor {
}

if section != "" {
return nil, fmt.Errorf("Too many lookup modules configured (%v, %v)",
section, name)
return nil, errors.Errorf("too many lookup modules "+
"configured (%v, %v)", section, name)
}

section = name
}

if section == "" {
return nil, errors.New("No lookup module configured")
return nil, errors.New("no lookup module configured")
}

backend, found := ns.reg[section]
if !found {
return nil, fmt.Errorf("Unknown lookup module: %v", section)
return nil, errors.Errorf("unknown lookup module: %v", section)
}

config, err := cfg.Child(section, -1)
Expand Down
Loading