diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index a45fb7fad0af..443595e0ba0b 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -101,6 +101,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Add support for spooling to disk to the beats event publishing pipeline. {pull}6581[6581] - Added logging of system info at Beat startup. {issue}5946[5946] - Do not log errors if X-Pack Monitoring is enabled but Elastisearch X-Pack is not. {pull}6627[6627] +- Add rename processor. {pull}6292[6292] *Auditbeat* diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index 2e2499a5c67e..cbce75f67aff 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -242,6 +242,14 @@ auditbeat.modules: # equals: # http.code: 200 # +# The following example renames the field a to b: +# +#processors: +#- rename: +# fields: +# - from: "a" +# to: "b" +# # The following example enriches each event with metadata from the cloud # provider about the host machine. It works on EC2, GCE, DigitalOcean, # Tencent Cloud, and Alibaba Cloud. diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index bab8874f33f0..bbca33f8e09f 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -760,6 +760,14 @@ filebeat.inputs: # equals: # http.code: 200 # +# The following example renames the field a to b: +# +#processors: +#- rename: +# fields: +# - from: "a" +# to: "b" +# # The following example enriches each event with metadata from the cloud # provider about the host machine. It works on EC2, GCE, DigitalOcean, # Tencent Cloud, and Alibaba Cloud. diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index 8a64f40f240f..1acb33bbda8f 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -351,6 +351,14 @@ heartbeat.scheduler: # equals: # http.code: 200 # +# The following example renames the field a to b: +# +#processors: +#- rename: +# fields: +# - from: "a" +# to: "b" +# # The following example enriches each event with metadata from the cloud # provider about the host machine. It works on EC2, GCE, DigitalOcean, # Tencent Cloud, and Alibaba Cloud. diff --git a/libbeat/_meta/config.reference.yml b/libbeat/_meta/config.reference.yml index 8f46b24d5a97..986aab97f6c4 100644 --- a/libbeat/_meta/config.reference.yml +++ b/libbeat/_meta/config.reference.yml @@ -137,6 +137,14 @@ # equals: # http.code: 200 # +# The following example renames the field a to b: +# +#processors: +#- rename: +# fields: +# - from: "a" +# to: "b" +# # The following example enriches each event with metadata from the cloud # provider about the host machine. It works on EC2, GCE, DigitalOcean, # Tencent Cloud, and Alibaba Cloud. diff --git a/libbeat/docs/processors-using.asciidoc b/libbeat/docs/processors-using.asciidoc index bbe9caa3a476..49d72f5051c7 100644 --- a/libbeat/docs/processors-using.asciidoc +++ b/libbeat/docs/processors-using.asciidoc @@ -43,6 +43,7 @@ The supported processors are: * <> * <> * <> + * <> * <> * <> * <> @@ -54,8 +55,8 @@ Each condition receives a field to compare. You can specify multiple fields under the same condition by using `AND` between the fields (for example, `field1 AND field2`). -For each field, you can specify a simple field name or a nested map, for -example `dns.question.name`. +For each field, you can specify a simple field name or a nested map, for example +`dns.question.name`. See <> for a list of all the fields that are exported by {beatname_uc}. @@ -531,6 +532,49 @@ section. NOTE: If you define an empty list of fields under `include_fields`, then only the required fields, `@timestamp` and `type`, are exported. +[[rename-fields]] +=== Rename fields from events + +The `rename` processor specifies a list of fields to rename. Under the `fields` +key each entry contains a `from: old-key` and a `to: new-key` pair. `from` is +the origin and `to` the target name of the field. + +Renaming fields can be useful in cases where field names cause conflicts. For +example if an event has two fields, `c` and `c.b`, that are both assigned scalar +values (e.g. `{"c": 1, "c.b": 2}`) this will result in an Elasticsearch error at +ingest time. This is because the value of a cannot simultaneously be a scalar +and an object. To prevent this rename_fields can be used to rename `c` to +`c.value`. + +Rename fields cannot be used to overwrite fields. To overwrite fields either +first rename the target field or use the `drop_fields` processor to drop the +field and then rename the field. + +[source,yaml] +------- +processors: +- rename: + fields: + - from: "a.g" + to: "e.d" + ignore_missing: false + fail_on_error: true +------- + +The `rename` processor has the following configuration settings: + +`ignore_missing`:: (Optional) If set to true, no error is logged in case a key +which should be renamed is missing. Default is `false`. + +`fail_on_error`:: (Optional) If set to true, in case of an error the renaming of +fields is stopped and the original event is returned. If set to false, renaming +continues also if an error happened during renaming. Default is `true`. + +See <> for a list of supported conditions. + +You can specify multiple `ignore_missing` processors under the `processors` +section. + [[add-kubernetes-metadata]] === Add Kubernetes metadata diff --git a/libbeat/processors/actions/rename.go b/libbeat/processors/actions/rename.go new file mode 100644 index 000000000000..dac988aaca20 --- /dev/null +++ b/libbeat/processors/actions/rename.go @@ -0,0 +1,104 @@ +package actions + +import ( + "fmt" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" +) + +type renameFields struct { + config renameFieldsConfig +} + +type renameFieldsConfig struct { + Fields []fromTo `config:"fields"` + IgnoreMissing bool `config:"ignore_missing"` + FailOnError bool `config:"fail_on_error"` +} + +type fromTo struct { + From string `config:"from"` + To string `config:"to"` +} + +func init() { + processors.RegisterPlugin("rename", + configChecked(newRenameFields, + requireFields("fields"))) +} + +func newRenameFields(c *common.Config) (processors.Processor, error) { + + cfgwarn.Beta("Beta rename processor is used.") + config := renameFieldsConfig{ + IgnoreMissing: false, + FailOnError: true, + } + err := c.Unpack(&config) + if err != nil { + return nil, fmt.Errorf("failed to unpack the rename configuration: %s", err) + } + + f := &renameFields{ + config: config, + } + return f, nil +} + +func (f *renameFields) Run(event *beat.Event) (*beat.Event, error) { + var backup common.MapStr + // Creates a copy of the event to revert in case of failure + if f.config.FailOnError { + backup = event.Fields.Clone() + } + + for _, field := range f.config.Fields { + err := f.renameField(field.From, field.To, event.Fields) + if err != nil && f.config.FailOnError { + logp.Debug("rename", "Failed to rename fields, revert to old event: %s", err) + event.Fields = backup + return event, err + } + } + + return event, nil +} + +func (f *renameFields) renameField(from string, to string, fields common.MapStr) error { + // Fields cannot be overwritten. Either the target field has to be dropped first or renamed first + exists, _ := fields.HasKey(to) + if exists { + return fmt.Errorf("target field %s already exists, drop or rename this field first", to) + } + + value, err := fields.GetValue(from) + if err != nil { + // Ignore ErrKeyNotFound errors + if f.config.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound { + return nil + } + return fmt.Errorf("could not fetch value for key: %s, Error: %s", to, err) + } + + // Deletion must happen first to support cases where a becomes a.b + err = fields.Delete(from) + if err != nil { + return fmt.Errorf("could not delete key: %s, %+v", from, err) + } + + _, err = fields.Put(to, value) + if err != nil { + return fmt.Errorf("could not put value: %s: %v, %+v", to, value, err) + } + return nil +} + +func (f *renameFields) String() string { + return "rename=" + fmt.Sprintf("%+v", f.config.Fields) +} diff --git a/libbeat/processors/actions/rename_test.go b/libbeat/processors/actions/rename_test.go new file mode 100644 index 000000000000..55bc17d0ac24 --- /dev/null +++ b/libbeat/processors/actions/rename_test.go @@ -0,0 +1,343 @@ +package actions + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "reflect" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +func TestRenameRun(t *testing.T) { + var tests = []struct { + description string + Fields []fromTo + IgnoreMissing bool + FailOnError bool + Input common.MapStr + Output common.MapStr + error bool + }{ + { + description: "simple field renaming", + Fields: []fromTo{ + { + From: "a", + To: "b", + }, + }, + Input: common.MapStr{ + "a": "c", + }, + Output: common.MapStr{ + "b": "c", + }, + IgnoreMissing: false, + FailOnError: true, + error: false, + }, + { + description: "Add one more hierarchy to event", + Fields: []fromTo{ + { + From: "a.b", + To: "a.b.c", + }, + }, + Input: common.MapStr{ + "a.b": 1, + }, + Output: common.MapStr{ + "a": common.MapStr{ + "b": common.MapStr{ + "c": 1, + }, + }, + }, + IgnoreMissing: false, + FailOnError: true, + error: false, + }, + { + description: "overwrites an existing field which is not allowed", + Fields: []fromTo{ + { + From: "a", + To: "b", + }, + }, + Input: common.MapStr{ + "a": 2, + "b": "q", + }, + Output: common.MapStr{ + "a": 2, + "b": "q", + }, + error: true, + FailOnError: true, + IgnoreMissing: false, + }, + { + description: "overwrites existing field but renames it first, order matters", + Fields: []fromTo{ + { + From: "b", + To: "c", + }, + { + From: "a", + To: "b", + }, + }, + Input: common.MapStr{ + "a": 2, + "b": "q", + }, + Output: common.MapStr{ + "b": 2, + "c": "q", + }, + error: false, + FailOnError: true, + IgnoreMissing: false, + }, + { + description: "take an invalid ES event with key / object conflict and convert it to a valid event", + Fields: []fromTo{ + { + From: "a", + To: "a.value", + }, + }, + Input: common.MapStr{ + "a": 5, + "a.b": 6, + }, + Output: common.MapStr{ + "a.b": 6, + "a": common.MapStr{ + "value": 5, + }, + }, + error: false, + FailOnError: true, + IgnoreMissing: false, + }, + { + description: "renames two fields into the same namespace. order matters as a is first key and then object", + Fields: []fromTo{ + { + From: "a", + To: "a.value", + }, + { + From: "c", + To: "a.c", + }, + }, + Input: common.MapStr{ + "a": 7, + "c": 8, + }, + Output: common.MapStr{ + "a": common.MapStr{ + "value": 7, + "c": 8, + }, + }, + error: false, + IgnoreMissing: false, + FailOnError: true, + }, + { + description: "rename two fields into the same name space. this fails because a is already a key, renaming of a needs to happen first", + Fields: []fromTo{ + { + From: "c", + To: "a.c", + }, + { + From: "a", + To: "a.value", + }, + }, + Input: common.MapStr{ + "a": 9, + "c": 10, + }, + Output: common.MapStr{ + "a": 9, + "c": 10, + }, + error: true, + IgnoreMissing: false, + FailOnError: true, + }, + { + description: "renames conflicting keys. partially works because fail_on_error is false", + Fields: []fromTo{ + { + From: "c", + To: "a.c", + }, + { + From: "a", + To: "a.value", + }, + }, + Input: common.MapStr{ + "a": 9, + "c": 10, + }, + Output: common.MapStr{ + "a": common.MapStr{ + "value": 9, + }, + }, + error: false, + IgnoreMissing: false, + FailOnError: false, + }, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + f := &renameFields{ + config: renameFieldsConfig{ + Fields: test.Fields, + IgnoreMissing: test.IgnoreMissing, + FailOnError: test.FailOnError, + }, + } + event := &beat.Event{ + Fields: test.Input, + } + + newEvent, err := f.Run(event) + if !test.error { + assert.Nil(t, err) + } else { + assert.NotNil(t, err) + } + + assert.True(t, reflect.DeepEqual(newEvent.Fields, test.Output)) + }) + } +} + +func TestRenameField(t *testing.T) { + var tests = []struct { + From string + To string + ignoreMissing bool + failOnError bool + Input common.MapStr + Output common.MapStr + error bool + description string + }{ + { + description: "simple rename of field", + From: "a", + To: "c", + Input: common.MapStr{ + "a": "b", + }, + Output: common.MapStr{ + "c": "b", + }, + error: false, + failOnError: true, + ignoreMissing: false, + }, + { + description: "Add hierarchy to event", + From: "a.b", + To: "a.b.c", + Input: common.MapStr{ + "a.b": 1, + }, + Output: common.MapStr{ + "a": common.MapStr{ + "b": common.MapStr{ + "c": 1, + }, + }, + }, + error: false, + failOnError: true, + ignoreMissing: false, + }, + { + description: "overwrite an existing field that should lead to an error", + From: "a", + To: "b", + Input: common.MapStr{ + "a": 2, + "b": "q", + }, + Output: common.MapStr{ + "a": 2, + "b": "q", + }, + error: true, + failOnError: true, + ignoreMissing: false, + }, + { + description: "resolve dotted event conflict", + From: "a", + To: "a.value", + Input: common.MapStr{ + "a": 5, + "a.b": 6, + }, + Output: common.MapStr{ + "a.b": 6, + "a": common.MapStr{ + "value": 5, + }, + }, + error: false, + failOnError: true, + ignoreMissing: false, + }, + { + description: "try to rename no existing field with failOnError true", + From: "a", + To: "b", + Input: common.MapStr{ + "c": 5, + }, + Output: common.MapStr{ + "c": 5, + }, + failOnError: true, + ignoreMissing: false, + error: true, + }, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + + f := &renameFields{ + config: renameFieldsConfig{ + IgnoreMissing: test.ignoreMissing, + FailOnError: test.failOnError, + }, + } + + err := f.renameField(test.From, test.To, test.Input) + if err != nil { + assert.Equal(t, test.error, true) + } + + assert.True(t, reflect.DeepEqual(test.Input, test.Output)) + }) + } +} diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 3bdea7d9b018..8fc815db056d 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -714,6 +714,14 @@ metricbeat.modules: # equals: # http.code: 200 # +# The following example renames the field a to b: +# +#processors: +#- rename: +# fields: +# - from: "a" +# to: "b" +# # The following example enriches each event with metadata from the cloud # provider about the host machine. It works on EC2, GCE, DigitalOcean, # Tencent Cloud, and Alibaba Cloud. diff --git a/metricbeat/tests/system/test_processors.py b/metricbeat/tests/system/test_processors.py index 1c83e2288407..840a7fefdba5 100644 --- a/metricbeat/tests/system/test_processors.py +++ b/metricbeat/tests/system/test_processors.py @@ -5,7 +5,7 @@ @unittest.skipUnless(re.match("(?i)win|linux|darwin|freebsd", sys.platform), "os") -class TestProcessors(metricbeat.BaseTest): +class Test(metricbeat.BaseTest): def test_drop_fields(self): @@ -259,3 +259,31 @@ def test_contradictory_multiple_actions(self): "system.process.memory.rss.pct" ]: assert key not in output + + def test_rename_field(self): + + self.render_config_template( + modules=[{ + "name": "system", + "metricsets": ["cpu"], + "period": "1s" + }], + processors=[{ + "rename": { + "fields": [{"from": "metricset.name", "to": "hello.world"}], + }, + }] + ) + proc = self.start_beat() + self.wait_until(lambda: self.output_lines() > 0) + proc.check_kill_and_wait() + + output = self.read_output_json() + self.assertEqual(len(output), 1) + evt = output[0] + + print(evt) + print(evt.keys()) + + assert "name" not in output[0]["metricset"] + assert "cpu" in output[0]["hello"]["world"] diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index 9d0e7e302ad3..88c5013f6140 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -614,6 +614,14 @@ packetbeat.protocols: # equals: # http.code: 200 # +# The following example renames the field a to b: +# +#processors: +#- rename: +# fields: +# - from: "a" +# to: "b" +# # The following example enriches each event with metadata from the cloud # provider about the host machine. It works on EC2, GCE, DigitalOcean, # Tencent Cloud, and Alibaba Cloud. diff --git a/winlogbeat/winlogbeat.reference.yml b/winlogbeat/winlogbeat.reference.yml index a4de6f7f7e73..cbd999a7a863 100644 --- a/winlogbeat/winlogbeat.reference.yml +++ b/winlogbeat/winlogbeat.reference.yml @@ -166,6 +166,14 @@ winlogbeat.event_logs: # equals: # http.code: 200 # +# The following example renames the field a to b: +# +#processors: +#- rename: +# fields: +# - from: "a" +# to: "b" +# # The following example enriches each event with metadata from the cloud # provider about the host machine. It works on EC2, GCE, DigitalOcean, # Tencent Cloud, and Alibaba Cloud.