Skip to content

Commit

Permalink
Functionbeat: add index option to function configuration (#15101)
Browse files Browse the repository at this point in the history
* Refactoring: extract processor adding code to own function

* Adding CHANGELOG entry

* Fleshing out TODOs

* Refactoring: extracting processors code into separate file for isolating tests

* Fleshing out unit tests

* Adding doc

* Fixing formatting
  • Loading branch information
ycombinator authored Dec 17, 2019
1 parent 8e31628 commit f163dfe
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add input for Cloudwatch logs through Kinesis. {pull}13317[13317]
- Enable Logstash output. {pull}13345[13345]
- Make `bulk_max_size` configurable in outputs. {pull}13493[13493]
- Add `index` option to all functions to directly set a per-function index value. {issue}15064[15064] {pull}15101[15101]

*Winlogbeat*

Expand Down
13 changes: 13 additions & 0 deletions x-pack/functionbeat/docs/config-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,16 @@ The default is trim_horizon.

If this option is set to true, fields with `null` values will be published in
the output document. By default, `keep_null` is set to `false`.

[float]
[id="{beatname_lc}-index"]
==== `index`

If present, this formatted string overrides the index for events from this function
(for elasticsearch outputs), or sets the `raw_index` field of the event's
metadata (for other outputs). This string can only refer to the agent name and
version and the event timestamp; for access to dynamic fields, use
`output.elasticsearch.index` or a processor.

Example value: `"%{[agent.name]}-myindex-%{+yyyy.MM.dd}"` might
expand to `"functionbeat-myindex-2019.12.13"`.
31 changes: 19 additions & 12 deletions x-pack/functionbeat/function/beater/functionbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"strings"
"time"

"github.com/elastic/beats/libbeat/common/fmtstr"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -102,7 +104,7 @@ func (bt *Functionbeat) Run(b *beat.Beat) error {
bt.log.Info("Functionbeat is running")
defer bt.log.Info("Functionbeat stopped running")

clientFactory := makeClientFactory(bt.log, b.Publisher)
clientFactory := makeClientFactory(bt.log, b.Publisher, b.Info)

enabledFunctions := bt.enabledFunctions()
bt.log.Infof("Functionbeat is configuring enabled functions: %s", strings.Join(enabledFunctions, ", "))
Expand Down Expand Up @@ -149,33 +151,38 @@ func isOutputSupported(name string) bool {
return false
}

func makeClientFactory(log *logp.Logger, pipeline beat.Pipeline) func(*common.Config) (core.Client, error) {
type fnExtraConfig struct {
Processors processors.PluginConfig `config:"processors"`

// KeepNull determines whether published events will keep null values or omit them.
KeepNull bool `config:"keep_null"`

common.EventMetadata `config:",inline"` // Fields and tags to add to events.

// ES output index pattern
Index fmtstr.EventFormatString `config:"index"`
}

func makeClientFactory(log *logp.Logger, pipeline beat.Pipeline, beatInfo beat.Info) func(*common.Config) (core.Client, error) {
// Each function has his own client to the publisher pipeline,
// publish operation will block the calling thread, when the method unwrap we have received the
// ACK for the batch.
return func(cfg *common.Config) (core.Client, error) {
c := struct {
Processors processors.PluginConfig `config:"processors"`

// KeepNull determines whether published events will keep null values or omit them.
KeepNull bool `config:"keep_null"`

common.EventMetadata `config:",inline"` // Fields and tags to add to events.
}{}
c := fnExtraConfig{}

if err := cfg.Unpack(&c); err != nil {
return nil, err
}

processors, err := processors.New(c.Processors)
funcProcessors, err := processorsForFunction(beatInfo, c)
if err != nil {
return nil, err
}

client, err := core.NewSyncClient(log, pipeline, beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
Processing: beat.ProcessingConfig{
Processor: processors,
Processor: funcProcessors,
EventMetadata: c.EventMetadata,
KeepNull: c.KeepNull,
},
Expand Down
151 changes: 151 additions & 0 deletions x-pack/functionbeat/function/beater/proccessors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package beater

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
_ "github.com/elastic/beats/libbeat/processors/actions"
)

func TestProcessorsForFunction(t *testing.T) {
testCases := map[string]struct {
beatInfo beat.Info
configStr string
event beat.Event
expectedFields map[string]string
}{
"Simple static index": {
configStr: "index: 'test'",
expectedFields: map[string]string{
"@metadata.raw_index": "test",
},
},
"Index with agent info + timestamp": {
beatInfo: beat.Info{Beat: "TestBeat", Version: "3.9.27"},
configStr: "index: 'beat-%{[agent.name]}-%{[agent.version]}-%{+yyyy.MM.dd}'",
event: beat.Event{Timestamp: time.Date(1999, time.December, 31, 23, 0, 0, 0, time.UTC)},
expectedFields: map[string]string{
"@metadata.raw_index": "beat-TestBeat-3.9.27-1999.12.31",
},
},
"Set field in input config": {
configStr: `processors: [add_fields: {fields: {testField: inputConfig}}]`,
expectedFields: map[string]string{
"fields.testField": "inputConfig",
},
},
}
for description, test := range testCases {
if test.event.Fields == nil {
test.event.Fields = common.MapStr{}
}
config, err := functionConfigFromString(test.configStr)
if err != nil {
t.Errorf("[%s] %v", description, err)
continue
}
processors, err := processorsForFunction(test.beatInfo, config)
if err != nil {
t.Errorf("[%s] %v", description, err)
continue
}
processedEvent, err := processors.Run(&test.event)
// We don't check if err != nil, because we are testing the final outcome
// of running the processors, including when some of them fail.
if processedEvent == nil {
t.Errorf("[%s] Unexpected fatal error running processors: %v\n",
description, err)
}
for key, value := range test.expectedFields {
field, err := processedEvent.GetValue(key)
if err != nil {
t.Errorf("[%s] Couldn't get field %s from event: %v", description, key, err)
continue
}
assert.Equal(t, field, value)
fieldStr, ok := field.(string)
if !ok {
// Note that requiring a string here is just to simplify the test setup,
// not a requirement of the underlying api.
t.Errorf("[%s] Field [%s] should be a string", description, key)
continue
}
if fieldStr != value {
t.Errorf("[%s] Event field [%s]: expected [%s], got [%s]", description, key, value, fieldStr)
}
}
}
}

func TestProcessorsForFunctionIsFlat(t *testing.T) {
// This test is regrettable, and exists because of inconsistencies in
// processor handling between processors.Processors and processing.group
// (which implements beat.ProcessorList) -- see processorsForConfig for
// details. The upshot is that, for now, if the function configuration specifies
// processors, they must be returned as direct children of the resulting
// processors.Processors (rather than being collected in additional tree
// structure).
// This test should be removed once we have a more consistent mechanism for
// collecting and running processors.
configStr := `processors:
- add_fields: {fields: {testField: value}}
- add_fields: {fields: {testField2: stuff}}`
config, err := functionConfigFromString(configStr)
if err != nil {
t.Fatal(err)
}
processors, err := processorsForFunction(
beat.Info{}, config)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 2, len(processors.List))
}

// setRawIndex is a bare-bones processor to set the raw_index field to a
// constant string in the event metadata. It is used to test order of operations
// for processorsForConfig.
type setRawIndex struct {
indexStr string
}

func (p *setRawIndex) Run(event *beat.Event) (*beat.Event, error) {
if event.Meta == nil {
event.Meta = common.MapStr{}
}
event.Meta["raw_index"] = p.indexStr
return event, nil
}

func (p *setRawIndex) String() string {
return fmt.Sprintf("set_raw_index=%v", p.indexStr)
}

// Helper function to convert from YML input string to an unpacked
// fnExtraConfig
func functionConfigFromString(s string) (fnExtraConfig, error) {
config := fnExtraConfig{}
cfg, err := common.NewConfigFrom(s)
if err != nil {
return config, err
}
err = cfg.Unpack(&config)
return config, err
}

// makeProcessors wraps one or more bare Processor objects in Processors.
func makeProcessors(procs ...processors.Processor) *processors.Processors {
procList := processors.NewList(nil)
procList.List = procs
return procList
}
38 changes: 38 additions & 0 deletions x-pack/functionbeat/function/beater/processors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package beater

import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/add_formatted_index"
)

func processorsForFunction(beatInfo beat.Info, config fnExtraConfig) (*processors.Processors, error) {
procs := processors.NewList(nil)

// Processor ordering is important:
// 1. Index configuration
if !config.Index.IsEmpty() {
staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version)
timestampFormat, err :=
fmtstr.NewTimestampFormatString(&config.Index, staticFields)
if err != nil {
return nil, err
}
indexProcessor := add_formatted_index.New(timestampFormat)
procs.AddProcessor(indexProcessor)
}

// 2. User processors
userProcessors, err := processors.New(config.Processors)
if err != nil {
return nil, err
}
procs.AddProcessors(*userProcessors)

return procs, nil
}

0 comments on commit f163dfe

Please sign in to comment.