Skip to content

Commit

Permalink
Implement setting to keep null values in events (elastic#13928)
Browse files Browse the repository at this point in the history
* Implement setting to allow emitting of null values in events

* Rename emit_null_values -> keep_null

* Move KeepNull into beat.ProcessingConfig

* Allow keep_null setting in Metricbeat module configurations

* Allowing x-pack Filebeat inputs to take `keep_null` option

* Allowing Functionbeat to take `keep_null` option

* Allow Heartbeat to take `keep_null` setting

* Allow Winlogbeat to take `keep_null` option

* Allow Packetbeat to take `keep_null` option

* Adding CHANGELOG

* Updating configuration files with new setting

* Documenting setting for Filebeat

* Documenting setting for Metricbeat

* Documenting setting for Auditbeat

* Documenting setting for Packetbeat

* Documenting setting for Winlogbeat

* Documenting setting for Heartbeat

* Documenting setting for Functionbeat

* Updating reference config file for Auditbeat

* Fixing syntax

* Reverting changes to Metricbeat module references config files

* Removing redundant KeepNull config option from x-pack/filebeat
  • Loading branch information
ycombinator authored Oct 15, 2019
1 parent ed9dba5 commit ec7a9a2
Show file tree
Hide file tree
Showing 44 changed files with 398 additions and 54 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Do not check for alias when setup.ilm.check_exists is false. {pull}13848[13848]
- Add support for numeric time zone offsets in timestamp processor. {pull}13902[13902]
- Marking Central Management deprecated. {pull}14018[14018]
- Add `keep_null` setting to allow Beats to publish null values in events. {issue}5522[5522] {pull}13928[13928]

*Auditbeat*

Expand Down
7 changes: 7 additions & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ auditbeat.modules:
rate_limit: 0
include_raw_message: false
include_warnings: false

# Set to true to publish fields with null values in events.
#keep_null: false

# Load audit rules from separate files. Same format as audit.rules(7).
audit_rule_files: [ '${path.config}/audit.rules.d/*.conf' ]
audit_rules: |
Expand Down Expand Up @@ -110,6 +114,9 @@ auditbeat.modules:
# Detect changes to files included in subdirectories. Disabled by default.
recursive: false

# Set to true to publish fields with null values in events.
#keep_null: false


#================================ General ======================================

Expand Down
3 changes: 3 additions & 0 deletions auditbeat/docs/modules/auditd.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ time.
- `none`: No backpressure mitigation measures are enabled.
--

*`keep_null`*:: If this option is set to true, fields with `null` values will be
published in the output document. By default, `keep_null` is set to `false`.

[float]
=== Audit rules

Expand Down
3 changes: 3 additions & 0 deletions auditbeat/docs/modules/file_integrity.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ of this directories are watched. If `recursive` is set to `true`, the
`file_integrity` module will watch for changes on this directories and all
their subdirectories.

*`keep_null`*:: If this option is set to true, fields with `null` values will be
published in the output document. By default, `keep_null` is set to `false`.


[float]
=== Example configuration
Expand Down
4 changes: 4 additions & 0 deletions auditbeat/module/auditd/_meta/config.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
rate_limit: 0
include_raw_message: false
include_warnings: false

# Set to true to publish fields with null values in events.
#keep_null: false

{{ end -}}
# Load audit rules from separate files. Same format as audit.rules(7).
audit_rule_files: [ '${path.config}/audit.rules.d/*.conf' ]
Expand Down
3 changes: 3 additions & 0 deletions auditbeat/module/auditd/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ time.
- `none`: No backpressure mitigation measures are enabled.
--

*`keep_null`*:: If this option is set to true, fields with `null` values will be
published in the output document. By default, `keep_null` is set to `false`.

[float]
=== Audit rules

Expand Down
3 changes: 3 additions & 0 deletions auditbeat/module/file_integrity/_meta/config.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,7 @@

# Detect changes to files included in subdirectories. Disabled by default.
recursive: false

# Set to true to publish fields with null values in events.
#keep_null: false
{{ end }}
3 changes: 3 additions & 0 deletions auditbeat/module/file_integrity/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,6 @@ The supported hash types are `blake2b_256`, `blake2b_384`, `blake2b_512`, `md5`,
of this directories are watched. If `recursive` is set to `true`, the
`file_integrity` module will watch for changes on this directories and all
their subdirectories.

*`keep_null`*:: If this option is set to true, fields with `null` values will be
published in the output document. By default, `keep_null` is set to `false`.
3 changes: 3 additions & 0 deletions filebeat/_meta/common.reference.inputs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ filebeat.inputs:
# fields.
#fields_under_root: false

# Set to true to publish fields with null values in events.
#keep_null: false

# Ignore files which were modified more then the defined timespan in the past.
# ignore_older is disabled by default, so no files are ignored by setting it to 0.
# Time strings like 2h (2 hours), 5m (5 minutes) can be used.
Expand Down
1 change: 1 addition & 0 deletions filebeat/channel/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien
clientCfg.Processing.Meta = meta
clientCfg.Processing.Fields = fields
clientCfg.Processing.Processor = userProcessors
clientCfg.Processing.KeepNull = config.KeepNull
client, err := c.pipeline.ConnectWith(clientCfg)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions filebeat/channel/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type inputOutletConfig struct {
// event processing
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
Processors processors.PluginConfig `config:"processors"`
KeepNull bool `config:"keep_null"`

// implicit event fields
Type string `config:"type"` // input.type
Expand Down
5 changes: 5 additions & 0 deletions filebeat/docs/inputs/input-common-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,8 @@ this option usually results in simpler configuration files. If the pipeline is
configured both in the input and output, the option from the
input is used.

[float]
===== `keep_null`

If this option is set to true, fields with `null` values will be published in
the output document. By default, `keep_null` is set to `false`.
3 changes: 3 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,9 @@ filebeat.inputs:
# fields.
#fields_under_root: false

# Set to true to publish fields with null values in events.
#keep_null: false

# Ignore files which were modified more then the defined timespan in the past.
# ignore_older is disabled by default, so no files are ignored by setting it to 0.
# Time strings like 2h (2 hours), 5m (5 minutes) can be used.
Expand Down
7 changes: 7 additions & 0 deletions heartbeat/_meta/beat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ heartbeat.monitors:
# Interval between file file changed checks.
#interval: 5s

# Set to true to publish fields with null values in events.
#keep_null: false

# Define a directory to load monitor definitions from. Definitions take the form
# of individual yaml files.
# heartbeat.config.monitors:
Expand Down Expand Up @@ -158,6 +161,8 @@ heartbeat.monitors:
# Interval between file file changed checks.
#interval: 5s

# Set to true to publish fields with null values in events.
#keep_null: false

- type: http # monitor type `http`. Connect via HTTP an optionally verify response

Expand Down Expand Up @@ -246,6 +251,8 @@ heartbeat.monitors:
# Interval between file file changed checks.
#interval: 5s

# Set to true to publish fields with null values in events.
#keep_null: false

heartbeat.scheduler:
# Limit number of concurrent tasks executed by heartbeat. The task limit if
Expand Down
9 changes: 8 additions & 1 deletion heartbeat/docs/heartbeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,13 @@ A list of processors to apply to the data generated by the monitor.
See <<filtering-and-enhancing-data>> for information about specifying
processors in your config.

[float]
[[monitor-keep-null]]
==== `keep_null`

If this option is set to true, fields with `null` values will be published in
the output document. By default, `keep_null` is set to `false`.

[float]
[[monitor-icmp-options]]
=== ICMP options
Expand Down Expand Up @@ -446,7 +453,7 @@ Set `response.include_body_max_bytes` to control the maximum size of the stored
[[monitor-http-check]]
==== `check`

An optional `request` to send to the remote host and the expected `response`.
An optional `request` to send to the remote host and the expected `response`.

Example configuration:

Expand Down
7 changes: 7 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ heartbeat.monitors:
# Interval between file file changed checks.
#interval: 5s

# Set to true to publish fields with null values in events.
#keep_null: false

# Define a directory to load monitor definitions from. Definitions take the form
# of individual yaml files.
# heartbeat.config.monitors:
Expand Down Expand Up @@ -158,6 +161,8 @@ heartbeat.monitors:
# Interval between file file changed checks.
#interval: 5s

# Set to true to publish fields with null values in events.
#keep_null: false

- type: http # monitor type `http`. Connect via HTTP an optionally verify response

Expand Down Expand Up @@ -246,6 +251,8 @@ heartbeat.monitors:
# Interval between file file changed checks.
#interval: 5s

# Set to true to publish fields with null values in events.
#keep_null: false

heartbeat.scheduler:
# Limit number of concurrent tasks executed by heartbeat. The task limit if
Expand Down
4 changes: 4 additions & 0 deletions heartbeat/monitors/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ type jobConfig struct {
// Fields and tags to add to monitor.
EventMetadata common.EventMetadata `config:",inline"`
Processors processors.PluginConfig `config:"processors"`

// KeepNull determines whether published events will keep null values or omit them.
KeepNull bool `config:"keep_null"`
}

// ProcessorsError is used to indicate situations when processors could not be loaded.
Expand Down Expand Up @@ -108,6 +111,7 @@ func (t *configuredJob) Start() {
Processing: beat.ProcessingConfig{
EventMetadata: t.config.EventMetadata,
Processor: t.processors,
KeepNull: t.config.KeepNull,
Fields: fields,
},
})
Expand Down
3 changes: 3 additions & 0 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ type ProcessingConfig struct {
// the pipeline processors.
Processor ProcessorList

// KeepNull determines whether published events will keep null values or omit them.
KeepNull bool

// Private contains additional information to be passed to the processing
// pipeline builder.
Private interface{}
Expand Down
55 changes: 36 additions & 19 deletions libbeat/common/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,30 @@ var textMarshalerType = reflect.TypeOf((*encoding.TextMarshaler)(nil)).Elem()

type Float float64

// ConvertToGenericEvent normalizes the types contained in the given MapStr.
// EventConverter is used to convert MapStr objects for publishing
type EventConverter interface {
Convert(m MapStr) MapStr
}

// GenericEventConverter is used to normalize MapStr objects for publishing
type GenericEventConverter struct {
keepNull bool
}

// NewGenericEventConverter creates an EventConverter with the given configuration options
func NewGenericEventConverter(keepNull bool) *GenericEventConverter {
return &GenericEventConverter{
keepNull: keepNull,
}
}

// Convert normalizes the types contained in the given MapStr.
//
// Nil values in maps are dropped during the conversion. Any unsupported types
// that are found in the MapStr are dropped and warnings are logged.
func ConvertToGenericEvent(m MapStr) MapStr {
func (e *GenericEventConverter) Convert(m MapStr) MapStr {
keys := make([]string, 0, 10)
event, errs := normalizeMap(m, keys...)
event, errs := e.normalizeMap(m, keys...)
if len(errs) > 0 {
logp.Warn("Unsuccessful conversion to generic event: %v errors: %v, "+
"event=%#v", len(errs), errs, m)
Expand All @@ -56,18 +73,18 @@ func ConvertToGenericEvent(m MapStr) MapStr {
// normalizeMap normalizes each element contained in the given map. If an error
// occurs during normalization, processing of m will continue, and all errors
// are returned at the end.
func normalizeMap(m MapStr, keys ...string) (MapStr, []error) {
func (e *GenericEventConverter) normalizeMap(m MapStr, keys ...string) (MapStr, []error) {
var errs []error

out := make(MapStr, len(m))
for key, value := range m {
v, err := normalizeValue(value, append(keys, key)...)
v, err := e.normalizeValue(value, append(keys, key)...)
if len(err) > 0 {
errs = append(errs, err...)
}

// Drop nil values from maps.
if v == nil {
if !e.keepNull && v == nil {
if logp.IsDebug(eventDebugSelector) {
eventDebugf("Dropped nil value from event where key=%v", joinKeys(append(keys, key)...))
}
Expand All @@ -81,12 +98,12 @@ func normalizeMap(m MapStr, keys ...string) (MapStr, []error) {
}

// normalizeMapStrSlice normalizes each individual MapStr.
func normalizeMapStrSlice(maps []MapStr, keys ...string) ([]MapStr, []error) {
func (e *GenericEventConverter) normalizeMapStrSlice(maps []MapStr, keys ...string) ([]MapStr, []error) {
var errs []error

out := make([]MapStr, 0, len(maps))
for i, m := range maps {
normalizedMap, err := normalizeMap(m, append(keys, strconv.Itoa(i))...)
normalizedMap, err := e.normalizeMap(m, append(keys, strconv.Itoa(i))...)
if len(err) > 0 {
errs = append(errs, err...)
}
Expand All @@ -98,12 +115,12 @@ func normalizeMapStrSlice(maps []MapStr, keys ...string) ([]MapStr, []error) {

// normalizeMapStringSlice normalizes each individual map[string]interface{} and
// returns a []MapStr.
func normalizeMapStringSlice(maps []map[string]interface{}, keys ...string) ([]MapStr, []error) {
func (e *GenericEventConverter) normalizeMapStringSlice(maps []map[string]interface{}, keys ...string) ([]MapStr, []error) {
var errs []error

out := make([]MapStr, 0, len(maps))
for i, m := range maps {
normalizedMap, err := normalizeMap(m, append(keys, strconv.Itoa(i))...)
normalizedMap, err := e.normalizeMap(m, append(keys, strconv.Itoa(i))...)
if len(err) > 0 {
errs = append(errs, err...)
}
Expand All @@ -114,13 +131,13 @@ func normalizeMapStringSlice(maps []map[string]interface{}, keys ...string) ([]M
}

// normalizeSlice normalizes each element of the slice and returns a []interface{}.
func normalizeSlice(v reflect.Value, keys ...string) (interface{}, []error) {
func (e *GenericEventConverter) normalizeSlice(v reflect.Value, keys ...string) (interface{}, []error) {
var errs []error
var sliceValues []interface{}

n := v.Len()
for i := 0; i < n; i++ {
sliceValue, err := normalizeValue(v.Index(i).Interface(), append(keys, strconv.Itoa(i))...)
sliceValue, err := e.normalizeValue(v.Index(i).Interface(), append(keys, strconv.Itoa(i))...)
if len(err) > 0 {
errs = append(errs, err...)
}
Expand All @@ -131,7 +148,7 @@ func normalizeSlice(v reflect.Value, keys ...string) (interface{}, []error) {
return sliceValues, errs
}

func normalizeValue(value interface{}, keys ...string) (interface{}, []error) {
func (e *GenericEventConverter) normalizeValue(value interface{}, keys ...string) (interface{}, []error) {
if value == nil {
return nil, nil
}
Expand Down Expand Up @@ -202,20 +219,20 @@ func normalizeValue(value interface{}, keys ...string) (interface{}, []error) {
case []complex64, []complex128:
case Time, []Time:
case MapStr:
return normalizeMap(value.(MapStr), keys...)
return e.normalizeMap(value.(MapStr), keys...)
case []MapStr:
return normalizeMapStrSlice(value.([]MapStr), keys...)
return e.normalizeMapStrSlice(value.([]MapStr), keys...)
case map[string]interface{}:
return normalizeMap(value.(map[string]interface{}), keys...)
return e.normalizeMap(value.(map[string]interface{}), keys...)
case []map[string]interface{}:
return normalizeMapStringSlice(value.([]map[string]interface{}), keys...)
return e.normalizeMapStringSlice(value.([]map[string]interface{}), keys...)
default:
v := reflect.ValueOf(value)

switch v.Type().Kind() {
case reflect.Ptr:
// Dereference pointers.
return normalizeValue(followPointer(value), keys...)
return e.normalizeValue(followPointer(value), keys...)
case reflect.Bool:
return v.Bool(), nil
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
Expand All @@ -229,7 +246,7 @@ func normalizeValue(value interface{}, keys ...string) (interface{}, []error) {
case reflect.String:
return v.String(), nil
case reflect.Array, reflect.Slice:
return normalizeSlice(v, keys...)
return e.normalizeSlice(v, keys...)
case reflect.Map, reflect.Struct:
var m MapStr
err := marshalUnmarshal(value, &m)
Expand Down
Loading

0 comments on commit ec7a9a2

Please sign in to comment.