From a5da85c6439bd16130090a8dff8e3ad8f0c09f24 Mon Sep 17 00:00:00 2001 From: Vijay Samuel Date: Tue, 23 Jun 2020 03:53:55 -0700 Subject: [PATCH] Add support for multiple sets of hints on autodiscover (#18883) Allows defining multiple sets of annotations similar to how processors are defined. This functionality already exists in heartbeat. This change standardizes the utility and adds it to logs and metrics hints builder as well. --- CHANGELOG.next.asciidoc | 1 + filebeat/autodiscover/builder/hints/logs.go | 121 +++-- .../autodiscover/builder/hints/logs_test.go | 414 +++++++++++------- filebeat/docs/autodiscover-hints.asciidoc | 16 + .../autodiscover/builder/hints/monitors.go | 42 +- heartbeat/docs/autodiscover-hints.asciidoc | 15 + libbeat/autodiscover/builder/helper.go | 39 ++ libbeat/autodiscover/builder/helper_test.go | 94 ++++ .../autodiscover/builder/hints/metrics.go | 124 +++--- .../builder/hints/metrics_test.go | 251 +++++++---- metricbeat/docs/autodiscover-hints.asciidoc | 19 + 11 files changed, 734 insertions(+), 402 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2ee62f329a6..224aada05d0 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -306,6 +306,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add backoff configuration options for the Kafka output. {issue}16777[16777] {pull}17808[17808] - Add TLS support to Kerberos authentication in Elasticsearch. {pull}18607[18607] - Upgrade k8s.io/client-go and k8s keystore tests. {pull}18817[18817] +- Add support for multiple sets of hints on autodiscover {pull}18883[18883] *Auditbeat* diff --git a/filebeat/autodiscover/builder/hints/logs.go b/filebeat/autodiscover/builder/hints/logs.go index 70758a8a028..05014134106 100644 --- a/filebeat/autodiscover/builder/hints/logs.go +++ b/filebeat/autodiscover/builder/hints/logs.go @@ -79,7 +79,7 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*comm hints, _ = hIface.(common.MapStr) } - inputConfig := l.getInputs(hints) + inputConfig := l.getInputsConfigs(hints) // If default config is disabled return nothing unless it's explicty enabled if !l.config.DefaultConfig.Enabled() && !builder.IsEnabled(hints, l.config.Key) { @@ -87,16 +87,12 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*comm return []*common.Config{} } - // If explicty disabled, return nothing + // If explictly disabled, return nothing if builder.IsDisabled(hints, l.config.Key) { logp.Debug("hints.builder", "logs disabled by hint: %+v", event) return []*common.Config{} } - // Clone original config, enable it if disabled - config, _ := common.NewConfigFrom(l.config.DefaultConfig) - config.Remove("enabled", -1) - host, _ := event["host"].(string) if host == "" { return []*common.Config{} @@ -114,58 +110,67 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*comm return template.ApplyConfigTemplate(event, configs) } - tempCfg := common.MapStr{} - mline := l.getMultiline(hints) - if len(mline) != 0 { - tempCfg.Put(multiline, mline) - } - if ilines := l.getIncludeLines(hints); len(ilines) != 0 { - tempCfg.Put(includeLines, ilines) - } - if elines := l.getExcludeLines(hints); len(elines) != 0 { - tempCfg.Put(excludeLines, elines) - } - - if procs := l.getProcessors(hints); len(procs) != 0 { - tempCfg.Put(processors, procs) - } - - if jsonOpts := l.getJSONOptions(hints); len(jsonOpts) != 0 { - tempCfg.Put(json, jsonOpts) - } - // Merge config template with the configs from the annotations - if err := config.Merge(tempCfg); err != nil { - logp.Debug("hints.builder", "config merge failed with error: %v", err) - return []*common.Config{config} - } + var configs []*common.Config + inputs := l.getInputs(hints) + for _, h := range inputs { + // Clone original config, enable it if disabled + config, _ := common.NewConfigFrom(l.config.DefaultConfig) + config.Remove("enabled", -1) + + tempCfg := common.MapStr{} + mline := l.getMultiline(h) + if len(mline) != 0 { + tempCfg.Put(multiline, mline) + } + if ilines := l.getIncludeLines(h); len(ilines) != 0 { + tempCfg.Put(includeLines, ilines) + } + if elines := l.getExcludeLines(h); len(elines) != 0 { + tempCfg.Put(excludeLines, elines) + } - module := l.getModule(hints) - if module != "" { - moduleConf := map[string]interface{}{ - "module": module, + if procs := l.getProcessors(h); len(procs) != 0 { + tempCfg.Put(processors, procs) } - filesets := l.getFilesets(hints, module) - for fileset, conf := range filesets { - filesetConf, _ := common.NewConfigFrom(config) + if jsonOpts := l.getJSONOptions(h); len(jsonOpts) != 0 { + tempCfg.Put(json, jsonOpts) + } + // Merge config template with the configs from the annotations + if err := config.Merge(tempCfg); err != nil { + logp.Debug("hints.builder", "config merge failed with error: %v", err) + continue + } - if inputType, _ := filesetConf.String("type", -1); inputType == harvester.ContainerType { - filesetConf.SetString("stream", -1, conf.Stream) - } else { - filesetConf.SetString("containers.stream", -1, conf.Stream) + module := l.getModule(hints) + if module != "" { + moduleConf := map[string]interface{}{ + "module": module, } - moduleConf[fileset+".enabled"] = conf.Enabled - moduleConf[fileset+".input"] = filesetConf + filesets := l.getFilesets(hints, module) + for fileset, conf := range filesets { + filesetConf, _ := common.NewConfigFrom(config) + + if inputType, _ := filesetConf.String("type", -1); inputType == harvester.ContainerType { + filesetConf.SetString("stream", -1, conf.Stream) + } else { + filesetConf.SetString("containers.stream", -1, conf.Stream) + } - logp.Debug("hints.builder", "generated config %+v", moduleConf) + moduleConf[fileset+".enabled"] = conf.Enabled + moduleConf[fileset+".input"] = filesetConf + + logp.Debug("hints.builder", "generated config %+v", moduleConf) + } + config, _ = common.NewConfigFrom(moduleConf) } - config, _ = common.NewConfigFrom(moduleConf) + logp.Debug("hints.builder", "generated config %+v", config) + configs = append(configs, config) } - logp.Debug("hints.builder", "generated config %+v", config) // Apply information in event to the template to generate the final config - return template.ApplyConfigTemplate(event, []*common.Config{config}) + return template.ApplyConfigTemplate(event, configs) } func (l *logHints) getMultiline(hints common.MapStr) common.MapStr { @@ -186,7 +191,7 @@ func (l *logHints) getModule(hints common.MapStr) string { return validModuleNames.ReplaceAllString(module, "") } -func (l *logHints) getInputs(hints common.MapStr) []common.MapStr { +func (l *logHints) getInputsConfigs(hints common.MapStr) []common.MapStr { return builder.GetHintAsConfigs(hints, l.config.Key) } @@ -248,3 +253,23 @@ func (l *logHints) getFilesets(hints common.MapStr, module string) map[string]*f return filesets } + +func (l *logHints) getInputs(hints common.MapStr) []common.MapStr { + modules := builder.GetHintsAsList(hints, l.config.Key) + var output []common.MapStr + + for _, mod := range modules { + output = append(output, common.MapStr{ + l.config.Key: mod, + }) + } + + // Generate this so that no hints with completely valid templates work + if len(output) == 0 { + output = append(output, common.MapStr{ + l.config.Key: common.MapStr{}, + }) + } + + return output +} diff --git a/filebeat/autodiscover/builder/hints/logs_test.go b/filebeat/autodiscover/builder/hints/logs_test.go index b316cdb506c..0dadfe54798 100644 --- a/filebeat/autodiscover/builder/hints/logs_test.go +++ b/filebeat/autodiscover/builder/hints/logs_test.go @@ -54,7 +54,7 @@ func TestGenerateHints(t *testing.T) { config *common.Config event bus.Event len int - result common.MapStr + result []common.MapStr }{ { msg: "Default config is correct", @@ -73,9 +73,11 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "paths": []interface{}{"/var/lib/docker/containers/abc/*-json.log"}, - "type": "container", + result: []common.MapStr{ + { + "paths": []interface{}{"/var/lib/docker/containers/abc/*-json.log"}, + "type": "container", + }, }, }, { @@ -94,7 +96,8 @@ func TestGenerateHints(t *testing.T) { "id": "abc", }, }, - len: 0, + len: 0, + result: []common.MapStr{}, }, { msg: "Hint to enable when disabled by default works", @@ -119,10 +122,12 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "type": "container", - "paths": []interface{}{"/var/lib/docker/containers/abc/*-json.log"}, - "exclude_lines": []interface{}{"^test2", "^test3"}, + result: []common.MapStr{ + { + "type": "container", + "paths": []interface{}{"/var/lib/docker/containers/abc/*-json.log"}, + "exclude_lines": []interface{}{"^test2", "^test3"}, + }, }, }, { @@ -136,7 +141,7 @@ func TestGenerateHints(t *testing.T) { }, }, len: 0, - result: common.MapStr{}, + result: []common.MapStr{}, }, { msg: "Hints with logs.disable should return nothing", @@ -149,7 +154,7 @@ func TestGenerateHints(t *testing.T) { }, }, len: 0, - result: common.MapStr{}, + result: []common.MapStr{}, }, { msg: "Empty event hints should return default config", @@ -168,12 +173,14 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "type": "docker", - "containers": map[string]interface{}{ - "ids": []interface{}{"abc"}, + result: []common.MapStr{ + { + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", }, - "close_timeout": "true", }, }, { @@ -199,14 +206,62 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "type": "docker", - "containers": map[string]interface{}{ - "ids": []interface{}{"abc"}, + result: []common.MapStr{ + { + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + "include_lines": []interface{}{"^test", "^test1"}, + "exclude_lines": []interface{}{"^test2", "^test3"}, + "close_timeout": "true", + }, + }, + }, + { + msg: "Hints with two sets of include|exclude_lines must be part of the input config", + config: customCfg, + event: bus.Event{ + "host": "1.2.3.4", + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "name": "foobar", + "id": "abc", + }, + }, + "container": common.MapStr{ + "name": "foobar", + "id": "abc", + }, + "hints": common.MapStr{ + "logs": common.MapStr{ + "1": common.MapStr{ + "exclude_lines": "^test1, ^test2", + }, + "2": common.MapStr{ + "include_lines": "^test1, ^test2", + }, + }, + }, + }, + len: 2, + result: []common.MapStr{ + { + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + "exclude_lines": []interface{}{"^test1", "^test2"}, + "close_timeout": "true", + }, + { + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + "include_lines": []interface{}{"^test1", "^test2"}, + "close_timeout": "true", }, - "include_lines": []interface{}{"^test", "^test1"}, - "exclude_lines": []interface{}{"^test2", "^test3"}, - "close_timeout": "true", }, }, { @@ -234,16 +289,18 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "type": "docker", - "containers": map[string]interface{}{ - "ids": []interface{}{"abc"}, - }, - "multiline": map[string]interface{}{ - "pattern": "^test", - "negate": "true", + result: []common.MapStr{ + { + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + "multiline": map[string]interface{}{ + "pattern": "^test", + "negate": "true", + }, + "close_timeout": "true", }, - "close_timeout": "true", }, }, { @@ -268,14 +325,16 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "type": "docker", - "containers": map[string]interface{}{ - "ids": []interface{}{"abc"}, - }, - "multiline": map[string]interface{}{ - "pattern": "^test", - "negate": "true", + result: []common.MapStr{ + { + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + "multiline": map[string]interface{}{ + "pattern": "^test", + "negate": "true", + }, }, }, }, @@ -308,20 +367,22 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "type": "docker", - "containers": map[string]interface{}{ - "ids": []interface{}{"abc"}, - }, - "close_timeout": "true", - "processors": []interface{}{ - map[string]interface{}{ - "dissect": map[string]interface{}{ - "tokenizer": "%{key1} %{key2}", + result: []common.MapStr{ + { + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", + "processors": []interface{}{ + map[string]interface{}{ + "dissect": map[string]interface{}{ + "tokenizer": "%{key1} %{key2}", + }, + }, + map[string]interface{}{ + "drop_event": nil, }, - }, - map[string]interface{}{ - "drop_event": nil, }, }, }, @@ -348,28 +409,30 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "apache2", - "error": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "docker", - "containers": map[string]interface{}{ - "stream": "all", - "ids": []interface{}{"abc"}, + result: []common.MapStr{ + { + "module": "apache2", + "error": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "docker", + "containers": map[string]interface{}{ + "stream": "all", + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", }, - "close_timeout": "true", }, - }, - "access": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "docker", - "containers": map[string]interface{}{ - "stream": "all", - "ids": []interface{}{"abc"}, + "access": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "docker", + "containers": map[string]interface{}{ + "stream": "all", + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", }, - "close_timeout": "true", }, }, }, @@ -397,28 +460,30 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "apache2", - "access": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "docker", - "containers": map[string]interface{}{ - "stream": "all", - "ids": []interface{}{"abc"}, + result: []common.MapStr{ + { + "module": "apache2", + "access": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "docker", + "containers": map[string]interface{}{ + "stream": "all", + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", }, - "close_timeout": "true", }, - }, - "error": map[string]interface{}{ - "enabled": false, - "input": map[string]interface{}{ - "type": "docker", - "containers": map[string]interface{}{ - "stream": "all", - "ids": []interface{}{"abc"}, + "error": map[string]interface{}{ + "enabled": false, + "input": map[string]interface{}{ + "type": "docker", + "containers": map[string]interface{}{ + "stream": "all", + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", }, - "close_timeout": "true", }, }, }, @@ -447,28 +512,30 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "apache2", - "access": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "docker", - "containers": map[string]interface{}{ - "stream": "stdout", - "ids": []interface{}{"abc"}, + result: []common.MapStr{ + { + "module": "apache2", + "access": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "docker", + "containers": map[string]interface{}{ + "stream": "stdout", + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", }, - "close_timeout": "true", }, - }, - "error": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "docker", - "containers": map[string]interface{}{ - "stream": "stderr", - "ids": []interface{}{"abc"}, + "error": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "docker", + "containers": map[string]interface{}{ + "stream": "stderr", + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", }, - "close_timeout": "true", }, }, }, @@ -495,25 +562,27 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "apache2", - "error": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "container", - "stream": "all", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + result: []common.MapStr{ + { + "module": "apache2", + "error": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "container", + "stream": "all", + "paths": []interface{}{ + "/var/lib/docker/containers/abc/*-json.log", + }, }, }, - }, - "access": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "container", - "stream": "all", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + "access": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "container", + "stream": "all", + "paths": []interface{}{ + "/var/lib/docker/containers/abc/*-json.log", + }, }, }, }, @@ -542,25 +611,27 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "apache2", - "access": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "container", - "stream": "all", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + result: []common.MapStr{ + { + "module": "apache2", + "access": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "container", + "stream": "all", + "paths": []interface{}{ + "/var/lib/docker/containers/abc/*-json.log", + }, }, }, - }, - "error": map[string]interface{}{ - "enabled": false, - "input": map[string]interface{}{ - "type": "container", - "stream": "all", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + "error": map[string]interface{}{ + "enabled": false, + "input": map[string]interface{}{ + "type": "container", + "stream": "all", + "paths": []interface{}{ + "/var/lib/docker/containers/abc/*-json.log", + }, }, }, }, @@ -590,25 +661,27 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "apache2", - "access": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "container", - "stream": "stdout", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + result: []common.MapStr{ + { + "module": "apache2", + "access": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "container", + "stream": "stdout", + "paths": []interface{}{ + "/var/lib/docker/containers/abc/*-json.log", + }, }, }, - }, - "error": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "container", - "stream": "stderr", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + "error": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "container", + "stream": "stderr", + "paths": []interface{}{ + "/var/lib/docker/containers/abc/*-json.log", + }, }, }, }, @@ -629,15 +702,18 @@ func TestGenerateHints(t *testing.T) { } cfgs := l.CreateConfig(test.event) - assert.Equal(t, len(cfgs), test.len, test.msg) - if test.len != 0 { + assert.Equal(t, test.len, len(cfgs), test.msg) + configs := make([]common.MapStr, 0) + for _, cfg := range cfgs { config := common.MapStr{} - err := cfgs[0].Unpack(&config) - assert.Nil(t, err, test.msg) - - assert.Equal(t, test.result, config, test.msg) + err := cfg.Unpack(&config) + ok := assert.Nil(t, err, test.msg) + if !ok { + break + } + configs = append(configs, config) } - + assert.Equal(t, test.result, configs, test.msg) } } @@ -861,7 +937,7 @@ func TestGenerateHintsWithPaths(t *testing.T) { } cfgs := l.CreateConfig(test.event) - assert.Equal(t, len(cfgs), test.len, test.msg) + assert.Equal(t, test.len, len(cfgs), test.msg) if test.len != 0 { config := common.MapStr{} err := cfgs[0].Unpack(&config) diff --git a/filebeat/docs/autodiscover-hints.asciidoc b/filebeat/docs/autodiscover-hints.asciidoc index 8201de0e620..9c1893c8367 100644 --- a/filebeat/docs/autodiscover-hints.asciidoc +++ b/filebeat/docs/autodiscover-hints.asciidoc @@ -155,6 +155,22 @@ annotations: co.elastic.logs.sidecar/exclude_lines: '^DBG' ----- +[float] +===== Multiple sets of hints +When a container needs multiple inputs to be defined on it, sets of annotations can be provided with numeric prefixes. +If there are hints that don't have a numeric prefix then they get grouped together into a single configuration. + +["source","yaml",subs="attributes"] +------------------------------------------------------------------------------------- +annotations: + co.elastic.logs/exclude_lines: '^DBG' + co.elastic.logs/1.include_lines: '^DBG' + co.elastic.logs/1.processors.dissect.tokenizer: "%{key2} %{key1}" +------------------------------------------------------------------------------------- + +The above configuration would generate two input configurations. The first input handles only debug logs and passes it through a dissect +tokenizer. The second input handles everything but debug logs. + [float] ===== Configuring Namespace Defaults diff --git a/heartbeat/autodiscover/builder/hints/monitors.go b/heartbeat/autodiscover/builder/hints/monitors.go index f9fe8847d3e..ca182fe37f2 100644 --- a/heartbeat/autodiscover/builder/hints/monitors.go +++ b/heartbeat/autodiscover/builder/hints/monitors.go @@ -19,8 +19,6 @@ package hints import ( "fmt" - "sort" - "strconv" "strings" "github.com/elastic/go-ucfg" @@ -97,7 +95,7 @@ func (hb *heartbeatHints) CreateConfig(event bus.Event, options ...ucfg.Option) } tempCfg := common.MapStr{} - monitors := hb.getMonitors(hints) + monitors := builder.GetHintsAsList(hints, hb.config.Key) var configs []*common.Config for _, monitor := range monitors { @@ -138,44 +136,6 @@ func (hb *heartbeatHints) getRawConfigs(hints common.MapStr) []common.MapStr { return builder.GetHintAsConfigs(hints, hb.config.Key) } -func (hb *heartbeatHints) getMonitors(hints common.MapStr) []common.MapStr { - raw := builder.GetHintMapStr(hints, hb.config.Key, "") - if raw == nil { - return nil - } - - var words, nums []string - - for key := range raw { - if _, err := strconv.Atoi(key); err != nil { - words = append(words, key) - continue - } else { - nums = append(nums, key) - } - } - - sort.Strings(nums) - - var configs []common.MapStr - for _, key := range nums { - rawCfg, _ := raw[key] - if config, ok := rawCfg.(common.MapStr); ok { - configs = append(configs, config) - } - } - - defaultMap := common.MapStr{} - for _, word := range words { - defaultMap[word] = raw[word] - } - - if len(defaultMap) != 0 { - configs = append(configs, defaultMap) - } - return configs -} - func (hb *heartbeatHints) getProcessors(hints common.MapStr) []common.MapStr { return builder.GetConfigs(hints, "", "processors") } diff --git a/heartbeat/docs/autodiscover-hints.asciidoc b/heartbeat/docs/autodiscover-hints.asciidoc index d89b52f8508..0b6a44115c9 100644 --- a/heartbeat/docs/autodiscover-hints.asciidoc +++ b/heartbeat/docs/autodiscover-hints.asciidoc @@ -92,6 +92,21 @@ annotations: ----- +[float] +===== Multiple sets of hints +When a container needs multiple monitors to be defined on it, sets of annotations can be provided with numeric prefixes. +Annotations without numeric prefixes would default into a single monitor configuration. + +["source","yaml",subs="attributes"] +------------------------------------------------------------------------------------- +annotations: + co.elastic.monitor/type: http + co.elastic.monitor/hosts: ${data.host}:8080/healtlz + co.elastic.monitor/schedule: "@every 5s" + co.elastic.monitor/1.type: tcp + co.elastic.monitor/1.hosts: ${data.host}:8080 + co.elastic.monitor/1.schedule: "@every 5s" +------------------------------------------------------------------------------------- [float] diff --git a/libbeat/autodiscover/builder/helper.go b/libbeat/autodiscover/builder/helper.go index b6d52a08eb5..faaa112c65f 100644 --- a/libbeat/autodiscover/builder/helper.go +++ b/libbeat/autodiscover/builder/helper.go @@ -248,3 +248,42 @@ func GenerateHints(annotations common.MapStr, container, prefix string) common.M return hints } + +// GetHintsAsList gets a set of hints and tries to convert them into a list of hints +func GetHintsAsList(hints common.MapStr, key string) []common.MapStr { + raw := GetHintMapStr(hints, key, "") + if raw == nil { + return nil + } + + var words, nums []string + + for key := range raw { + if _, err := strconv.Atoi(key); err != nil { + words = append(words, key) + continue + } else { + nums = append(nums, key) + } + } + + sort.Strings(nums) + + var configs []common.MapStr + for _, key := range nums { + rawCfg, _ := raw[key] + if config, ok := rawCfg.(common.MapStr); ok { + configs = append(configs, config) + } + } + + defaultMap := common.MapStr{} + for _, word := range words { + defaultMap[word] = raw[word] + } + + if len(defaultMap) != 0 { + configs = append(configs, defaultMap) + } + return configs +} diff --git a/libbeat/autodiscover/builder/helper_test.go b/libbeat/autodiscover/builder/helper_test.go index e8b5ce52179..dee7c95ef13 100644 --- a/libbeat/autodiscover/builder/helper_test.go +++ b/libbeat/autodiscover/builder/helper_test.go @@ -219,3 +219,97 @@ func TestGenerateHints(t *testing.T) { assert.Equal(t, test.result, GenerateHints(annMap, "foobar", "co.elastic")) } } +func TestGetHintsAsList(t *testing.T) { + tests := []struct { + input common.MapStr + output []common.MapStr + message string + }{ + { + input: common.MapStr{ + "metrics": common.MapStr{ + "module": "prometheus", + "period": "15s", + }, + }, + output: []common.MapStr{ + { + "module": "prometheus", + "period": "15s", + }, + }, + message: "Single hint should return a single set of configs", + }, + { + input: common.MapStr{ + "metrics": common.MapStr{ + "1": common.MapStr{ + "module": "prometheus", + "period": "15s", + }, + }, + }, + output: []common.MapStr{ + { + "module": "prometheus", + "period": "15s", + }, + }, + message: "Single hint with numeric prefix should return a single set of configs", + }, + { + input: common.MapStr{ + "metrics": common.MapStr{ + "1": common.MapStr{ + "module": "prometheus", + "period": "15s", + }, + "2": common.MapStr{ + "module": "dropwizard", + "period": "20s", + }, + }, + }, + output: []common.MapStr{ + { + "module": "prometheus", + "period": "15s", + }, + { + "module": "dropwizard", + "period": "20s", + }, + }, + message: "Multiple hints with numeric prefix should return configs in numeric ordering", + }, + { + input: common.MapStr{ + "metrics": common.MapStr{ + "1": common.MapStr{ + "module": "prometheus", + "period": "15s", + }, + "module": "dropwizard", + "period": "20s", + }, + }, + output: []common.MapStr{ + { + "module": "prometheus", + "period": "15s", + }, + { + "module": "dropwizard", + "period": "20s", + }, + }, + message: "Multiple hints with numeric prefix and default should return configs with defaults at the last", + }, + } + + for _, test := range tests { + t.Run(test.message, func(t *testing.T) { + assert.Equal(t, test.output, GetHintsAsList(test.input, "metrics")) + }) + } +} diff --git a/metricbeat/autodiscover/builder/hints/metrics.go b/metricbeat/autodiscover/builder/hints/metrics.go index 84ce8d65a86..52eba34a1ff 100644 --- a/metricbeat/autodiscover/builder/hints/metrics.go +++ b/metricbeat/autodiscover/builder/hints/metrics.go @@ -75,12 +75,12 @@ func NewMetricHints(cfg *common.Config) (autodiscover.Builder, error) { // Create configs based on hints passed from providers func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*common.Config { var ( - config []*common.Config - noPort bool + configs []*common.Config + noPort bool ) host, _ := event["host"].(string) if host == "" { - return config + return configs } port, ok := common.TryToInt(event["port"]) @@ -90,10 +90,10 @@ func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*c hints, ok := event["hints"].(common.MapStr) if !ok { - return config + return configs } - modulesConfig := m.getModules(hints) + modulesConfig := m.getModuleConfigs(hints) // here we handle raw configs if provided if modulesConfig != nil { configs := []*common.Config{} @@ -108,64 +108,67 @@ func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*c } - mod := m.getModule(hints) - if mod == "" { - return config - } + modules := m.getModules(hints) + for _, hint := range modules { + mod := m.getModule(hint) + if mod == "" { + continue + } - hosts, ok := m.getHostsWithPort(hints, port, noPort) - if !ok { - return config - } + hosts, ok := m.getHostsWithPort(hint, port, noPort) + if !ok { + continue + } - ns := m.getNamespace(hints) - msets := m.getMetricSets(hints, mod) - tout := m.getTimeout(hints) - ival := m.getPeriod(hints) - sslConf := m.getSSLConfig(hints) - procs := m.getProcessors(hints) - metricspath := m.getMetricPath(hints) - username := m.getUsername(hints) - password := m.getPassword(hints) - - moduleConfig := common.MapStr{ - "module": mod, - "metricsets": msets, - "hosts": hosts, - "timeout": tout, - "period": ival, - "enabled": true, - "ssl": sslConf, - "processors": procs, - } + ns := m.getNamespace(hint) + msets := m.getMetricSets(hint, mod) + tout := m.getTimeout(hint) + ival := m.getPeriod(hint) + sslConf := m.getSSLConfig(hint) + procs := m.getProcessors(hint) + metricspath := m.getMetricPath(hint) + username := m.getUsername(hint) + password := m.getPassword(hint) + + moduleConfig := common.MapStr{ + "module": mod, + "metricsets": msets, + "hosts": hosts, + "timeout": tout, + "period": ival, + "enabled": true, + "ssl": sslConf, + "processors": procs, + } - if ns != "" { - moduleConfig["namespace"] = ns - } - if metricspath != "" { - moduleConfig["metrics_path"] = metricspath - } - if username != "" { - moduleConfig["username"] = username - } - if password != "" { - moduleConfig["password"] = password - } + if ns != "" { + moduleConfig["namespace"] = ns + } + if metricspath != "" { + moduleConfig["metrics_path"] = metricspath + } + if username != "" { + moduleConfig["username"] = username + } + if password != "" { + moduleConfig["password"] = password + } - m.logger.Debug("generated config: %v", moduleConfig) + logp.Debug("hints.builder", "generated config: %v", moduleConfig) - // Create config object - cfg, err := common.NewConfigFrom(moduleConfig) - if err != nil { - logp.Debug("", "config merge failed with error: %v", err) + // Create config object + cfg, err := common.NewConfigFrom(moduleConfig) + if err != nil { + logp.Debug("hints.builder", "config merge failed with error: %v", err) + } + logp.Debug("hints.builder", "generated config: %+v", common.DebugString(cfg, true)) + configs = append(configs, cfg) } - m.logger.Debug("generated config: %+v", common.DebugString(cfg, true)) - config = append(config, cfg) // Apply information in event to the template to generate the final config // This especially helps in a scenario where endpoints are configured as: // co.elastic.metrics/hosts= "${data.host}:9090" - return template.ApplyConfigTemplate(event, config, options...) + return template.ApplyConfigTemplate(event, configs, options...) } func (m *metricHints) getModule(hints common.MapStr) string { @@ -267,7 +270,7 @@ func (m *metricHints) getSSLConfig(hints common.MapStr) common.MapStr { return builder.GetHintMapStr(hints, m.Key, ssl) } -func (m *metricHints) getModules(hints common.MapStr) []common.MapStr { +func (m *metricHints) getModuleConfigs(hints common.MapStr) []common.MapStr { return builder.GetHintAsConfigs(hints, m.Key) } @@ -275,3 +278,16 @@ func (m *metricHints) getProcessors(hints common.MapStr) []common.MapStr { return builder.GetProcessors(hints, m.Key) } + +func (m *metricHints) getModules(hints common.MapStr) []common.MapStr { + modules := builder.GetHintsAsList(hints, m.Key) + var output []common.MapStr + + for _, mod := range modules { + output = append(output, common.MapStr{ + m.Key: mod, + }) + } + + return output +} diff --git a/metricbeat/autodiscover/builder/hints/metrics_test.go b/metricbeat/autodiscover/builder/hints/metrics_test.go index 3b306ac62c0..d2d0462a047 100644 --- a/metricbeat/autodiscover/builder/hints/metrics_test.go +++ b/metricbeat/autodiscover/builder/hints/metrics_test.go @@ -38,7 +38,7 @@ func TestGenerateHints(t *testing.T) { message string event bus.Event len int - result common.MapStr + result []common.MapStr }{ { message: "Empty event hints should return empty config", @@ -58,7 +58,7 @@ func TestGenerateHints(t *testing.T) { }, }, len: 0, - result: common.MapStr{}, + result: []common.MapStr{}, }, { message: "Hints without host should return nothing", @@ -70,7 +70,7 @@ func TestGenerateHints(t *testing.T) { }, }, len: 0, - result: common.MapStr{}, + result: []common.MapStr{}, }, { message: "Hints without matching port should return nothing", @@ -85,7 +85,7 @@ func TestGenerateHints(t *testing.T) { }, }, len: 0, - result: common.MapStr{}, + result: []common.MapStr{}, }, { message: "Hints with multiple hosts return only the matching one", @@ -100,13 +100,15 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "metricsets": []string{"default"}, - "timeout": "3s", - "period": "1m", - "enabled": true, - "hosts": []interface{}{"1.2.3.4:9090"}, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9090"}, + }, }, }, { @@ -122,13 +124,15 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "metricsets": []string{"default"}, - "timeout": "3s", - "period": "1m", - "enabled": true, - "hosts": []interface{}{"1.2.3.4:9090"}, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9090"}, + }, }, }, { @@ -142,12 +146,14 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmodule", - "metricsets": []string{"one", "two"}, - "timeout": "3s", - "period": "1m", - "enabled": true, + result: []common.MapStr{ + { + "module": "mockmodule", + "metricsets": []string{"one", "two"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + }, }, }, { @@ -162,12 +168,14 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmodule", - "metricsets": []string{"one"}, - "timeout": "3s", - "period": "1m", - "enabled": true, + result: []common.MapStr{ + { + "module": "mockmodule", + "metricsets": []string{"one"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + }, }, }, { @@ -181,12 +189,14 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "metricsets": []string{"default"}, - "timeout": "3s", - "period": "1m", - "enabled": true, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + }, }, }, { @@ -200,12 +210,14 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "metricsets": []string{"default"}, - "timeout": "3s", - "period": "1m", - "enabled": true, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + }, }, }, { @@ -222,14 +234,16 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "namespace": "test", - "metricsets": []string{"default"}, - "timeout": "3s", - "period": "1m", - "enabled": true, - "hosts": []interface{}{"1.2.3.4:9090"}, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "namespace": "test", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9090"}, + }, }, }, { @@ -250,18 +264,20 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "namespace": "test", - "metricsets": []string{"default"}, - "timeout": "3s", - "period": "1m", - "enabled": true, - "hosts": []interface{}{"1.2.3.4:9090"}, - "processors": []interface{}{ - map[string]interface{}{ - "add_locale": map[string]interface{}{ - "abbrevation": "MST", + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "namespace": "test", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9090"}, + "processors": []interface{}{ + map[string]interface{}{ + "add_locale": map[string]interface{}{ + "abbrevation": "MST", + }, }, }, }, @@ -296,14 +312,16 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "namespace": "test", - "metricsets": []string{"default"}, - "hosts": []interface{}{"1.2.3.4:9090"}, - "timeout": "3s", - "period": "1m", - "enabled": true, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "namespace": "test", + "metricsets": []string{"default"}, + "hosts": []interface{}{"1.2.3.4:9090"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + }, }, }, { @@ -311,15 +329,18 @@ func TestGenerateHints(t *testing.T) { event: bus.Event{ "host": "1.2.3.4", "port": 80, - "hints": common.MapStr{ - "metrics": common.MapStr{ - "module": "mockmoduledefaults", - "namespace": "test", - "hosts": "${data.host}:8080", + "hints": []common.MapStr{ + { + "metrics": common.MapStr{ + "module": "mockmoduledefaults", + "namespace": "test", + "hosts": "${data.host}:8080", + }, }, }, }, - len: 0, + len: 0, + result: []common.MapStr{}, }, { message: "Non http URLs with valid host port combination should return a valid config", @@ -335,14 +356,57 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "namespace": "test", - "metricsets": []string{"default"}, - "hosts": []interface{}{"tcp(1.2.3.4:3306)/"}, - "timeout": "3s", - "period": "1m", - "enabled": true, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "namespace": "test", + "metricsets": []string{"default"}, + "hosts": []interface{}{"tcp(1.2.3.4:3306)/"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + }, + }, + }, + { + message: "Module with mutliple sets of hints must return the right configs", + event: bus.Event{ + "host": "1.2.3.4", + "hints": common.MapStr{ + "metrics": common.MapStr{ + "1": common.MapStr{ + "module": "mockmoduledefaults", + "namespace": "test", + "hosts": "${data.host}:9090", + }, + "2": common.MapStr{ + "module": "mockmoduledefaults", + "namespace": "test1", + "hosts": "${data.host}:9090/fake", + }, + }, + }, + }, + len: 2, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "namespace": "test", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9090"}, + }, + { + "module": "mockmoduledefaults", + "namespace": "test1", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9090/fake"}, + }, }, }, } @@ -361,11 +425,18 @@ func TestGenerateHints(t *testing.T) { cfgs := m.CreateConfig(test.event) assert.Equal(t, len(cfgs), test.len) - if len(cfgs) != 0 { + // The check below helps skipping config validation if there is no config supposed to be emitted. + if len(cfgs) == 0 { + continue + } + configs := make([]common.MapStr, 0) + for _, cfg := range cfgs { config := common.MapStr{} - err := cfgs[0].Unpack(&config) - assert.Nil(t, err, test.message) - + err := cfg.Unpack(&config) + ok := assert.Nil(t, err, test.message) + if !ok { + break + } // metricsets order is random, order it for tests if v, err := config.GetValue("metricsets"); err == nil { if msets, ok := v.([]interface{}); ok { @@ -377,9 +448,9 @@ func TestGenerateHints(t *testing.T) { config["metricsets"] = metricsets } } - - assert.Equal(t, test.result, config, test.message) + configs = append(configs, config) } + assert.Equal(t, test.result, configs, test.message) } } diff --git a/metricbeat/docs/autodiscover-hints.asciidoc b/metricbeat/docs/autodiscover-hints.asciidoc index 25029a61d05..ef99de73fc6 100644 --- a/metricbeat/docs/autodiscover-hints.asciidoc +++ b/metricbeat/docs/autodiscover-hints.asciidoc @@ -129,6 +129,25 @@ annotations: co.elastic.metrics.sidecar/hosts: '${data.host}:8080' ------------------------------------------------------------------------------------- +[float] +===== Multiple sets of hints +When a container port needs multiple modules to be defined on it, sets of annotations can be provided with numeric prefixes. +If there are hints that don't have a numeric prefix then they get grouped together into a single configuration. + +["source","yaml",subs="attributes"] +------------------------------------------------------------------------------------- +annotations: + co.elastic.metrics/1.module: prometheus + co.elastic.metrics/1.hosts: '${data.host}:80/metrics' + co.elastic.metrics/1.period: 60s + co.elastic.metrics/module: prometheus + co.elastic.metrics/hosts: '${data.host}:80/metrics/p1' + co.elastic.metrics/period: 5s +------------------------------------------------------------------------------------- + +The above configuration would spin up two metricbeat module configurations to ensure that the endpoint "/metrics/p1" is +polled every 60s whereas the "/metrics" endpoint is polled every 60s. + [float] ===== Configuring Namespace Defaults