From 5ecd98ed21b78ff4259e4d19c22a3b60aa37cf06 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Thu, 19 Mar 2020 08:56:52 -0400 Subject: [PATCH] Fix data race in convert processor (#17032) (#17045) If the convert processor was used in the global context it could lead to data races because there was a variable that was reused across executions. When processors are used in the global context they are shared across individual publisher clients so you could end up with a data race. The fix here was to replace the shared state with a local variable. In testing this didn't make much of a difference in the number of allocations. (cherry picked from commit b6167aebf788c121efa252e55e1c55603b411cb9) --- CHANGELOG.next.asciidoc | 1 + libbeat/processors/convert/convert.go | 28 +++++--------- libbeat/processors/convert/convert_test.go | 43 ++++++++++++++++++++++ 3 files changed, 54 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a0b82cc978fc..8125689eb220 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -88,6 +88,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix `NewContainerMetadataEnricher` to use default config for kubernetes module. {pull}16857[16857] - Improve some logging messages for add_kubernetes_metadata processor {pull}16866{16866} - Fix k8s metadata issue regarding node labels not shown up on root level of metadata. {pull}16834[16834] +- Fix concurrency issues in convert processor when used in the global context. {pull}17032[17032] *Auditbeat* diff --git a/libbeat/processors/convert/convert.go b/libbeat/processors/convert/convert.go index c6f3e5aea285..b781f37ed4a5 100644 --- a/libbeat/processors/convert/convert.go +++ b/libbeat/processors/convert/convert.go @@ -35,6 +35,8 @@ import ( const logName = "processor.convert" +var ignoredFailure = struct{}{} + func init() { processors.RegisterPlugin("convert", New) jsprocessor.RegisterPlugin("Convert", New) @@ -43,8 +45,6 @@ func init() { type processor struct { config log *logp.Logger - - converted []interface{} // Temporary storage for converted values. } // New constructs a new convert processor. @@ -63,7 +63,7 @@ func newConvert(c config) (*processor, error) { log = log.With("instance_id", c.Tag) } - return &processor{config: c, log: log, converted: make([]interface{}, len(c.Fields))}, nil + return &processor{config: c, log: log}, nil } func (p *processor) String() string { @@ -71,19 +71,11 @@ func (p *processor) String() string { return "convert=" + string(json) } -var ignoredFailure = struct{}{} - -func resetValues(s []interface{}) { - for i := range s { - s[i] = nil - } -} - func (p *processor) Run(event *beat.Event) (*beat.Event, error) { - defer resetValues(p.converted) + converted := make([]interface{}, len(p.Fields)) // Convert the fields and write the results to temporary storage. - if err := p.convertFields(event); err != nil { + if err := p.convertFields(event, converted); err != nil { return event, err } @@ -99,14 +91,14 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) { } // Update the event with the converted values. - if err := p.writeToEvent(event); err != nil { + if err := p.writeToEvent(event, converted); err != nil { return &saved, err } return event, nil } -func (p *processor) convertFields(event *beat.Event) error { +func (p *processor) convertFields(event *beat.Event, converted []interface{}) error { // Write conversion results to temporary storage. for i, conv := range p.Fields { v, err := p.convertField(event, conv) @@ -116,7 +108,7 @@ func (p *processor) convertFields(event *beat.Event) error { } v = ignoredFailure } - p.converted[i] = v + converted[i] = v } return nil @@ -142,9 +134,9 @@ func (p *processor) convertField(event *beat.Event, conversion field) (interface return v, nil } -func (p *processor) writeToEvent(event *beat.Event) error { +func (p *processor) writeToEvent(event *beat.Event, converted []interface{}) error { for i, conversion := range p.Fields { - v := p.converted[i] + v := converted[i] if v == ignoredFailure { continue } diff --git a/libbeat/processors/convert/convert_test.go b/libbeat/processors/convert/convert_test.go index 1eba3ff550fe..f761b047f4c9 100644 --- a/libbeat/processors/convert/convert_test.go +++ b/libbeat/processors/convert/convert_test.go @@ -424,3 +424,46 @@ func TestDataTypes(t *testing.T) { }) } } + +func BenchmarkTestConvertRun(b *testing.B) { + c := defaultConfig() + c.IgnoreMissing = true + c.Fields = append(c.Fields, + field{From: "source.address", To: "source.ip", Type: IP}, + field{From: "destination.address", To: "destination.ip", Type: IP}, + field{From: "a", To: "b"}, + field{From: "c", To: "d"}, + field{From: "e", To: "f"}, + field{From: "g", To: "h"}, + field{From: "i", To: "j"}, + field{From: "k", To: "l"}, + field{From: "m", To: "n"}, + field{From: "o", To: "p"}, + ) + + p, err := newConvert(c) + if err != nil { + b.Fatal(err) + } + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + event := &beat.Event{ + Fields: common.MapStr{ + "source": common.MapStr{ + "address": "192.51.100.1", + }, + "destination": common.MapStr{ + "address": "192.0.2.51", + }, + }, + } + + _, err := p.Run(event) + if err != nil { + b.Fatal(err) + } + } + }) +}