From 92c6d8e60b598f6db3b93e0e465a466686dad001 Mon Sep 17 00:00:00 2001 From: Nicolas Ruflin Date: Thu, 12 Apr 2018 15:53:41 +0200 Subject: [PATCH] Add rename fields processor (#6292) * Add rename fields processor The rename processor allows to rename fields before they are indexed to standardise on names or move fields around. This becomes useful when building filebeat modules which read from json files. With the rename processor no ingest pipeline is needed to follow the naming schema. This should make some modules simpler to build. It's also useful in combination with https://github.com/elastic/beats/pull/6024 to rename some fields according to the schema. ``` processors: - rename: fields: - from: "a" to: "b" ``` Intention of rename * Adjust fields to mapping * Prevent conflicts like `a` and `a.b` by renaming `a` to `a.value` Limitations * Will not overwrite keys --- CHANGELOG.asciidoc | 1 + auditbeat/auditbeat.reference.yml | 8 + filebeat/filebeat.reference.yml | 8 + heartbeat/heartbeat.reference.yml | 8 + libbeat/_meta/config.reference.yml | 8 + libbeat/docs/processors-using.asciidoc | 48 ++- libbeat/processors/actions/rename.go | 104 +++++++ libbeat/processors/actions/rename_test.go | 343 +++++++++++++++++++++ metricbeat/metricbeat.reference.yml | 8 + metricbeat/tests/system/test_processors.py | 30 +- packetbeat/packetbeat.reference.yml | 8 + winlogbeat/winlogbeat.reference.yml | 8 + 12 files changed, 579 insertions(+), 3 deletions(-) create mode 100644 libbeat/processors/actions/rename.go create mode 100644 libbeat/processors/actions/rename_test.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index a45fb7fad0af..443595e0ba0b 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -101,6 +101,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Add support for spooling to disk to the beats event publishing pipeline. {pull}6581[6581] - Added logging of system info at Beat startup. {issue}5946[5946] - Do not log errors if X-Pack Monitoring is enabled but Elastisearch X-Pack is not. {pull}6627[6627] +- Add rename processor. {pull}6292[6292] *Auditbeat* diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index 2e2499a5c67e..cbce75f67aff 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -242,6 +242,14 @@ auditbeat.modules: # equals: # http.code: 200 # +# The following example renames the field a to b: +# +#processors: +#- rename: +# fields: +# - from: "a" +# to: "b" +# # The following example enriches each event with metadata from the cloud # provider about the host machine. It works on EC2, GCE, DigitalOcean, # Tencent Cloud, and Alibaba Cloud. diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index bab8874f33f0..bbca33f8e09f 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -760,6 +760,14 @@ filebeat.inputs: # equals: # http.code: 200 # +# The following example renames the field a to b: +# +#processors: +#- rename: +# fields: +# - from: "a" +# to: "b" +# # The following example enriches each event with metadata from the cloud # provider about the host machine. It works on EC2, GCE, DigitalOcean, # Tencent Cloud, and Alibaba Cloud. diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index 8a64f40f240f..1acb33bbda8f 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -351,6 +351,14 @@ heartbeat.scheduler: # equals: # http.code: 200 # +# The following example renames the field a to b: +# +#processors: +#- rename: +# fields: +# - from: "a" +# to: "b" +# # The following example enriches each event with metadata from the cloud # provider about the host machine. It works on EC2, GCE, DigitalOcean, # Tencent Cloud, and Alibaba Cloud. diff --git a/libbeat/_meta/config.reference.yml b/libbeat/_meta/config.reference.yml index 8f46b24d5a97..986aab97f6c4 100644 --- a/libbeat/_meta/config.reference.yml +++ b/libbeat/_meta/config.reference.yml @@ -137,6 +137,14 @@ # equals: # http.code: 200 # +# The following example renames the field a to b: +# +#processors: +#- rename: +# fields: +# - from: "a" +# to: "b" +# # The following example enriches each event with metadata from the cloud # provider about the host machine. It works on EC2, GCE, DigitalOcean, # Tencent Cloud, and Alibaba Cloud. diff --git a/libbeat/docs/processors-using.asciidoc b/libbeat/docs/processors-using.asciidoc index bbe9caa3a476..49d72f5051c7 100644 --- a/libbeat/docs/processors-using.asciidoc +++ b/libbeat/docs/processors-using.asciidoc @@ -43,6 +43,7 @@ The supported processors are: * <> * <> * <> + * <> * <> * <> * <> @@ -54,8 +55,8 @@ Each condition receives a field to compare. You can specify multiple fields under the same condition by using `AND` between the fields (for example, `field1 AND field2`). -For each field, you can specify a simple field name or a nested map, for -example `dns.question.name`. +For each field, you can specify a simple field name or a nested map, for example +`dns.question.name`. See <> for a list of all the fields that are exported by {beatname_uc}. @@ -531,6 +532,49 @@ section. NOTE: If you define an empty list of fields under `include_fields`, then only the required fields, `@timestamp` and `type`, are exported. +[[rename-fields]] +=== Rename fields from events + +The `rename` processor specifies a list of fields to rename. Under the `fields` +key each entry contains a `from: old-key` and a `to: new-key` pair. `from` is +the origin and `to` the target name of the field. + +Renaming fields can be useful in cases where field names cause conflicts. For +example if an event has two fields, `c` and `c.b`, that are both assigned scalar +values (e.g. `{"c": 1, "c.b": 2}`) this will result in an Elasticsearch error at +ingest time. This is because the value of a cannot simultaneously be a scalar +and an object. To prevent this rename_fields can be used to rename `c` to +`c.value`. + +Rename fields cannot be used to overwrite fields. To overwrite fields either +first rename the target field or use the `drop_fields` processor to drop the +field and then rename the field. + +[source,yaml] +------- +processors: +- rename: + fields: + - from: "a.g" + to: "e.d" + ignore_missing: false + fail_on_error: true +------- + +The `rename` processor has the following configuration settings: + +`ignore_missing`:: (Optional) If set to true, no error is logged in case a key +which should be renamed is missing. Default is `false`. + +`fail_on_error`:: (Optional) If set to true, in case of an error the renaming of +fields is stopped and the original event is returned. If set to false, renaming +continues also if an error happened during renaming. Default is `true`. + +See <> for a list of supported conditions. + +You can specify multiple `ignore_missing` processors under the `processors` +section. + [[add-kubernetes-metadata]] === Add Kubernetes metadata diff --git a/libbeat/processors/actions/rename.go b/libbeat/processors/actions/rename.go new file mode 100644 index 000000000000..dac988aaca20 --- /dev/null +++ b/libbeat/processors/actions/rename.go @@ -0,0 +1,104 @@ +package actions + +import ( + "fmt" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" +) + +type renameFields struct { + config renameFieldsConfig +} + +type renameFieldsConfig struct { + Fields []fromTo `config:"fields"` + IgnoreMissing bool `config:"ignore_missing"` + FailOnError bool `config:"fail_on_error"` +} + +type fromTo struct { + From string `config:"from"` + To string `config:"to"` +} + +func init() { + processors.RegisterPlugin("rename", + configChecked(newRenameFields, + requireFields("fields"))) +} + +func newRenameFields(c *common.Config) (processors.Processor, error) { + + cfgwarn.Beta("Beta rename processor is used.") + config := renameFieldsConfig{ + IgnoreMissing: false, + FailOnError: true, + } + err := c.Unpack(&config) + if err != nil { + return nil, fmt.Errorf("failed to unpack the rename configuration: %s", err) + } + + f := &renameFields{ + config: config, + } + return f, nil +} + +func (f *renameFields) Run(event *beat.Event) (*beat.Event, error) { + var backup common.MapStr + // Creates a copy of the event to revert in case of failure + if f.config.FailOnError { + backup = event.Fields.Clone() + } + + for _, field := range f.config.Fields { + err := f.renameField(field.From, field.To, event.Fields) + if err != nil && f.config.FailOnError { + logp.Debug("rename", "Failed to rename fields, revert to old event: %s", err) + event.Fields = backup + return event, err + } + } + + return event, nil +} + +func (f *renameFields) renameField(from string, to string, fields common.MapStr) error { + // Fields cannot be overwritten. Either the target field has to be dropped first or renamed first + exists, _ := fields.HasKey(to) + if exists { + return fmt.Errorf("target field %s already exists, drop or rename this field first", to) + } + + value, err := fields.GetValue(from) + if err != nil { + // Ignore ErrKeyNotFound errors + if f.config.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound { + return nil + } + return fmt.Errorf("could not fetch value for key: %s, Error: %s", to, err) + } + + // Deletion must happen first to support cases where a becomes a.b + err = fields.Delete(from) + if err != nil { + return fmt.Errorf("could not delete key: %s, %+v", from, err) + } + + _, err = fields.Put(to, value) + if err != nil { + return fmt.Errorf("could not put value: %s: %v, %+v", to, value, err) + } + return nil +} + +func (f *renameFields) String() string { + return "rename=" + fmt.Sprintf("%+v", f.config.Fields) +} diff --git a/libbeat/processors/actions/rename_test.go b/libbeat/processors/actions/rename_test.go new file mode 100644 index 000000000000..55bc17d0ac24 --- /dev/null +++ b/libbeat/processors/actions/rename_test.go @@ -0,0 +1,343 @@ +package actions + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "reflect" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +func TestRenameRun(t *testing.T) { + var tests = []struct { + description string + Fields []fromTo + IgnoreMissing bool + FailOnError bool + Input common.MapStr + Output common.MapStr + error bool + }{ + { + description: "simple field renaming", + Fields: []fromTo{ + { + From: "a", + To: "b", + }, + }, + Input: common.MapStr{ + "a": "c", + }, + Output: common.MapStr{ + "b": "c", + }, + IgnoreMissing: false, + FailOnError: true, + error: false, + }, + { + description: "Add one more hierarchy to event", + Fields: []fromTo{ + { + From: "a.b", + To: "a.b.c", + }, + }, + Input: common.MapStr{ + "a.b": 1, + }, + Output: common.MapStr{ + "a": common.MapStr{ + "b": common.MapStr{ + "c": 1, + }, + }, + }, + IgnoreMissing: false, + FailOnError: true, + error: false, + }, + { + description: "overwrites an existing field which is not allowed", + Fields: []fromTo{ + { + From: "a", + To: "b", + }, + }, + Input: common.MapStr{ + "a": 2, + "b": "q", + }, + Output: common.MapStr{ + "a": 2, + "b": "q", + }, + error: true, + FailOnError: true, + IgnoreMissing: false, + }, + { + description: "overwrites existing field but renames it first, order matters", + Fields: []fromTo{ + { + From: "b", + To: "c", + }, + { + From: "a", + To: "b", + }, + }, + Input: common.MapStr{ + "a": 2, + "b": "q", + }, + Output: common.MapStr{ + "b": 2, + "c": "q", + }, + error: false, + FailOnError: true, + IgnoreMissing: false, + }, + { + description: "take an invalid ES event with key / object conflict and convert it to a valid event", + Fields: []fromTo{ + { + From: "a", + To: "a.value", + }, + }, + Input: common.MapStr{ + "a": 5, + "a.b": 6, + }, + Output: common.MapStr{ + "a.b": 6, + "a": common.MapStr{ + "value": 5, + }, + }, + error: false, + FailOnError: true, + IgnoreMissing: false, + }, + { + description: "renames two fields into the same namespace. order matters as a is first key and then object", + Fields: []fromTo{ + { + From: "a", + To: "a.value", + }, + { + From: "c", + To: "a.c", + }, + }, + Input: common.MapStr{ + "a": 7, + "c": 8, + }, + Output: common.MapStr{ + "a": common.MapStr{ + "value": 7, + "c": 8, + }, + }, + error: false, + IgnoreMissing: false, + FailOnError: true, + }, + { + description: "rename two fields into the same name space. this fails because a is already a key, renaming of a needs to happen first", + Fields: []fromTo{ + { + From: "c", + To: "a.c", + }, + { + From: "a", + To: "a.value", + }, + }, + Input: common.MapStr{ + "a": 9, + "c": 10, + }, + Output: common.MapStr{ + "a": 9, + "c": 10, + }, + error: true, + IgnoreMissing: false, + FailOnError: true, + }, + { + description: "renames conflicting keys. partially works because fail_on_error is false", + Fields: []fromTo{ + { + From: "c", + To: "a.c", + }, + { + From: "a", + To: "a.value", + }, + }, + Input: common.MapStr{ + "a": 9, + "c": 10, + }, + Output: common.MapStr{ + "a": common.MapStr{ + "value": 9, + }, + }, + error: false, + IgnoreMissing: false, + FailOnError: false, + }, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + f := &renameFields{ + config: renameFieldsConfig{ + Fields: test.Fields, + IgnoreMissing: test.IgnoreMissing, + FailOnError: test.FailOnError, + }, + } + event := &beat.Event{ + Fields: test.Input, + } + + newEvent, err := f.Run(event) + if !test.error { + assert.Nil(t, err) + } else { + assert.NotNil(t, err) + } + + assert.True(t, reflect.DeepEqual(newEvent.Fields, test.Output)) + }) + } +} + +func TestRenameField(t *testing.T) { + var tests = []struct { + From string + To string + ignoreMissing bool + failOnError bool + Input common.MapStr + Output common.MapStr + error bool + description string + }{ + { + description: "simple rename of field", + From: "a", + To: "c", + Input: common.MapStr{ + "a": "b", + }, + Output: common.MapStr{ + "c": "b", + }, + error: false, + failOnError: true, + ignoreMissing: false, + }, + { + description: "Add hierarchy to event", + From: "a.b", + To: "a.b.c", + Input: common.MapStr{ + "a.b": 1, + }, + Output: common.MapStr{ + "a": common.MapStr{ + "b": common.MapStr{ + "c": 1, + }, + }, + }, + error: false, + failOnError: true, + ignoreMissing: false, + }, + { + description: "overwrite an existing field that should lead to an error", + From: "a", + To: "b", + Input: common.MapStr{ + "a": 2, + "b": "q", + }, + Output: common.MapStr{ + "a": 2, + "b": "q", + }, + error: true, + failOnError: true, + ignoreMissing: false, + }, + { + description: "resolve dotted event conflict", + From: "a", + To: "a.value", + Input: common.MapStr{ + "a": 5, + "a.b": 6, + }, + Output: common.MapStr{ + "a.b": 6, + "a": common.MapStr{ + "value": 5, + }, + }, + error: false, + failOnError: true, + ignoreMissing: false, + }, + { + description: "try to rename no existing field with failOnError true", + From: "a", + To: "b", + Input: common.MapStr{ + "c": 5, + }, + Output: common.MapStr{ + "c": 5, + }, + failOnError: true, + ignoreMissing: false, + error: true, + }, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + + f := &renameFields{ + config: renameFieldsConfig{ + IgnoreMissing: test.ignoreMissing, + FailOnError: test.failOnError, + }, + } + + err := f.renameField(test.From, test.To, test.Input) + if err != nil { + assert.Equal(t, test.error, true) + } + + assert.True(t, reflect.DeepEqual(test.Input, test.Output)) + }) + } +} diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 3bdea7d9b018..8fc815db056d 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -714,6 +714,14 @@ metricbeat.modules: # equals: # http.code: 200 # +# The following example renames the field a to b: +# +#processors: +#- rename: +# fields: +# - from: "a" +# to: "b" +# # The following example enriches each event with metadata from the cloud # provider about the host machine. It works on EC2, GCE, DigitalOcean, # Tencent Cloud, and Alibaba Cloud. diff --git a/metricbeat/tests/system/test_processors.py b/metricbeat/tests/system/test_processors.py index 1c83e2288407..840a7fefdba5 100644 --- a/metricbeat/tests/system/test_processors.py +++ b/metricbeat/tests/system/test_processors.py @@ -5,7 +5,7 @@ @unittest.skipUnless(re.match("(?i)win|linux|darwin|freebsd", sys.platform), "os") -class TestProcessors(metricbeat.BaseTest): +class Test(metricbeat.BaseTest): def test_drop_fields(self): @@ -259,3 +259,31 @@ def test_contradictory_multiple_actions(self): "system.process.memory.rss.pct" ]: assert key not in output + + def test_rename_field(self): + + self.render_config_template( + modules=[{ + "name": "system", + "metricsets": ["cpu"], + "period": "1s" + }], + processors=[{ + "rename": { + "fields": [{"from": "metricset.name", "to": "hello.world"}], + }, + }] + ) + proc = self.start_beat() + self.wait_until(lambda: self.output_lines() > 0) + proc.check_kill_and_wait() + + output = self.read_output_json() + self.assertEqual(len(output), 1) + evt = output[0] + + print(evt) + print(evt.keys()) + + assert "name" not in output[0]["metricset"] + assert "cpu" in output[0]["hello"]["world"] diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index 9d0e7e302ad3..88c5013f6140 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -614,6 +614,14 @@ packetbeat.protocols: # equals: # http.code: 200 # +# The following example renames the field a to b: +# +#processors: +#- rename: +# fields: +# - from: "a" +# to: "b" +# # The following example enriches each event with metadata from the cloud # provider about the host machine. It works on EC2, GCE, DigitalOcean, # Tencent Cloud, and Alibaba Cloud. diff --git a/winlogbeat/winlogbeat.reference.yml b/winlogbeat/winlogbeat.reference.yml index a4de6f7f7e73..cbd999a7a863 100644 --- a/winlogbeat/winlogbeat.reference.yml +++ b/winlogbeat/winlogbeat.reference.yml @@ -166,6 +166,14 @@ winlogbeat.event_logs: # equals: # http.code: 200 # +# The following example renames the field a to b: +# +#processors: +#- rename: +# fields: +# - from: "a" +# to: "b" +# # The following example enriches each event with metadata from the cloud # provider about the host machine. It works on EC2, GCE, DigitalOcean, # Tencent Cloud, and Alibaba Cloud.