diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2ee62f329a6..224aada05d0 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -306,6 +306,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add backoff configuration options for the Kafka output. {issue}16777[16777] {pull}17808[17808] - Add TLS support to Kerberos authentication in Elasticsearch. {pull}18607[18607] - Upgrade k8s.io/client-go and k8s keystore tests. {pull}18817[18817] +- Add support for multiple sets of hints on autodiscover {pull}18883[18883] *Auditbeat* diff --git a/filebeat/autodiscover/builder/hints/logs.go b/filebeat/autodiscover/builder/hints/logs.go index 70758a8a028..05014134106 100644 --- a/filebeat/autodiscover/builder/hints/logs.go +++ b/filebeat/autodiscover/builder/hints/logs.go @@ -79,7 +79,7 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*comm hints, _ = hIface.(common.MapStr) } - inputConfig := l.getInputs(hints) + inputConfig := l.getInputsConfigs(hints) // If default config is disabled return nothing unless it's explicty enabled if !l.config.DefaultConfig.Enabled() && !builder.IsEnabled(hints, l.config.Key) { @@ -87,16 +87,12 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*comm return []*common.Config{} } - // If explicty disabled, return nothing + // If explictly disabled, return nothing if builder.IsDisabled(hints, l.config.Key) { logp.Debug("hints.builder", "logs disabled by hint: %+v", event) return []*common.Config{} } - // Clone original config, enable it if disabled - config, _ := common.NewConfigFrom(l.config.DefaultConfig) - config.Remove("enabled", -1) - host, _ := event["host"].(string) if host == "" { return []*common.Config{} @@ -114,58 +110,67 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*comm return template.ApplyConfigTemplate(event, configs) } - tempCfg := common.MapStr{} - mline := l.getMultiline(hints) - if len(mline) != 0 { - tempCfg.Put(multiline, mline) - } - if ilines := l.getIncludeLines(hints); len(ilines) != 0 { - tempCfg.Put(includeLines, ilines) - } - if elines := l.getExcludeLines(hints); len(elines) != 0 { - tempCfg.Put(excludeLines, elines) - } - - if procs := l.getProcessors(hints); len(procs) != 0 { - tempCfg.Put(processors, procs) - } - - if jsonOpts := l.getJSONOptions(hints); len(jsonOpts) != 0 { - tempCfg.Put(json, jsonOpts) - } - // Merge config template with the configs from the annotations - if err := config.Merge(tempCfg); err != nil { - logp.Debug("hints.builder", "config merge failed with error: %v", err) - return []*common.Config{config} - } + var configs []*common.Config + inputs := l.getInputs(hints) + for _, h := range inputs { + // Clone original config, enable it if disabled + config, _ := common.NewConfigFrom(l.config.DefaultConfig) + config.Remove("enabled", -1) + + tempCfg := common.MapStr{} + mline := l.getMultiline(h) + if len(mline) != 0 { + tempCfg.Put(multiline, mline) + } + if ilines := l.getIncludeLines(h); len(ilines) != 0 { + tempCfg.Put(includeLines, ilines) + } + if elines := l.getExcludeLines(h); len(elines) != 0 { + tempCfg.Put(excludeLines, elines) + } - module := l.getModule(hints) - if module != "" { - moduleConf := map[string]interface{}{ - "module": module, + if procs := l.getProcessors(h); len(procs) != 0 { + tempCfg.Put(processors, procs) } - filesets := l.getFilesets(hints, module) - for fileset, conf := range filesets { - filesetConf, _ := common.NewConfigFrom(config) + if jsonOpts := l.getJSONOptions(h); len(jsonOpts) != 0 { + tempCfg.Put(json, jsonOpts) + } + // Merge config template with the configs from the annotations + if err := config.Merge(tempCfg); err != nil { + logp.Debug("hints.builder", "config merge failed with error: %v", err) + continue + } - if inputType, _ := filesetConf.String("type", -1); inputType == harvester.ContainerType { - filesetConf.SetString("stream", -1, conf.Stream) - } else { - filesetConf.SetString("containers.stream", -1, conf.Stream) + module := l.getModule(hints) + if module != "" { + moduleConf := map[string]interface{}{ + "module": module, } - moduleConf[fileset+".enabled"] = conf.Enabled - moduleConf[fileset+".input"] = filesetConf + filesets := l.getFilesets(hints, module) + for fileset, conf := range filesets { + filesetConf, _ := common.NewConfigFrom(config) + + if inputType, _ := filesetConf.String("type", -1); inputType == harvester.ContainerType { + filesetConf.SetString("stream", -1, conf.Stream) + } else { + filesetConf.SetString("containers.stream", -1, conf.Stream) + } - logp.Debug("hints.builder", "generated config %+v", moduleConf) + moduleConf[fileset+".enabled"] = conf.Enabled + moduleConf[fileset+".input"] = filesetConf + + logp.Debug("hints.builder", "generated config %+v", moduleConf) + } + config, _ = common.NewConfigFrom(moduleConf) } - config, _ = common.NewConfigFrom(moduleConf) + logp.Debug("hints.builder", "generated config %+v", config) + configs = append(configs, config) } - logp.Debug("hints.builder", "generated config %+v", config) // Apply information in event to the template to generate the final config - return template.ApplyConfigTemplate(event, []*common.Config{config}) + return template.ApplyConfigTemplate(event, configs) } func (l *logHints) getMultiline(hints common.MapStr) common.MapStr { @@ -186,7 +191,7 @@ func (l *logHints) getModule(hints common.MapStr) string { return validModuleNames.ReplaceAllString(module, "") } -func (l *logHints) getInputs(hints common.MapStr) []common.MapStr { +func (l *logHints) getInputsConfigs(hints common.MapStr) []common.MapStr { return builder.GetHintAsConfigs(hints, l.config.Key) } @@ -248,3 +253,23 @@ func (l *logHints) getFilesets(hints common.MapStr, module string) map[string]*f return filesets } + +func (l *logHints) getInputs(hints common.MapStr) []common.MapStr { + modules := builder.GetHintsAsList(hints, l.config.Key) + var output []common.MapStr + + for _, mod := range modules { + output = append(output, common.MapStr{ + l.config.Key: mod, + }) + } + + // Generate this so that no hints with completely valid templates work + if len(output) == 0 { + output = append(output, common.MapStr{ + l.config.Key: common.MapStr{}, + }) + } + + return output +} diff --git a/filebeat/autodiscover/builder/hints/logs_test.go b/filebeat/autodiscover/builder/hints/logs_test.go index b316cdb506c..0dadfe54798 100644 --- a/filebeat/autodiscover/builder/hints/logs_test.go +++ b/filebeat/autodiscover/builder/hints/logs_test.go @@ -54,7 +54,7 @@ func TestGenerateHints(t *testing.T) { config *common.Config event bus.Event len int - result common.MapStr + result []common.MapStr }{ { msg: "Default config is correct", @@ -73,9 +73,11 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "paths": []interface{}{"/var/lib/docker/containers/abc/*-json.log"}, - "type": "container", + result: []common.MapStr{ + { + "paths": []interface{}{"/var/lib/docker/containers/abc/*-json.log"}, + "type": "container", + }, }, }, { @@ -94,7 +96,8 @@ func TestGenerateHints(t *testing.T) { "id": "abc", }, }, - len: 0, + len: 0, + result: []common.MapStr{}, }, { msg: "Hint to enable when disabled by default works", @@ -119,10 +122,12 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "type": "container", - "paths": []interface{}{"/var/lib/docker/containers/abc/*-json.log"}, - "exclude_lines": []interface{}{"^test2", "^test3"}, + result: []common.MapStr{ + { + "type": "container", + "paths": []interface{}{"/var/lib/docker/containers/abc/*-json.log"}, + "exclude_lines": []interface{}{"^test2", "^test3"}, + }, }, }, { @@ -136,7 +141,7 @@ func TestGenerateHints(t *testing.T) { }, }, len: 0, - result: common.MapStr{}, + result: []common.MapStr{}, }, { msg: "Hints with logs.disable should return nothing", @@ -149,7 +154,7 @@ func TestGenerateHints(t *testing.T) { }, }, len: 0, - result: common.MapStr{}, + result: []common.MapStr{}, }, { msg: "Empty event hints should return default config", @@ -168,12 +173,14 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "type": "docker", - "containers": map[string]interface{}{ - "ids": []interface{}{"abc"}, + result: []common.MapStr{ + { + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", }, - "close_timeout": "true", }, }, { @@ -199,14 +206,62 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "type": "docker", - "containers": map[string]interface{}{ - "ids": []interface{}{"abc"}, + result: []common.MapStr{ + { + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + "include_lines": []interface{}{"^test", "^test1"}, + "exclude_lines": []interface{}{"^test2", "^test3"}, + "close_timeout": "true", + }, + }, + }, + { + msg: "Hints with two sets of include|exclude_lines must be part of the input config", + config: customCfg, + event: bus.Event{ + "host": "1.2.3.4", + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "name": "foobar", + "id": "abc", + }, + }, + "container": common.MapStr{ + "name": "foobar", + "id": "abc", + }, + "hints": common.MapStr{ + "logs": common.MapStr{ + "1": common.MapStr{ + "exclude_lines": "^test1, ^test2", + }, + "2": common.MapStr{ + "include_lines": "^test1, ^test2", + }, + }, + }, + }, + len: 2, + result: []common.MapStr{ + { + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + "exclude_lines": []interface{}{"^test1", "^test2"}, + "close_timeout": "true", + }, + { + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + "include_lines": []interface{}{"^test1", "^test2"}, + "close_timeout": "true", }, - "include_lines": []interface{}{"^test", "^test1"}, - "exclude_lines": []interface{}{"^test2", "^test3"}, - "close_timeout": "true", }, }, { @@ -234,16 +289,18 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "type": "docker", - "containers": map[string]interface{}{ - "ids": []interface{}{"abc"}, - }, - "multiline": map[string]interface{}{ - "pattern": "^test", - "negate": "true", + result: []common.MapStr{ + { + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + "multiline": map[string]interface{}{ + "pattern": "^test", + "negate": "true", + }, + "close_timeout": "true", }, - "close_timeout": "true", }, }, { @@ -268,14 +325,16 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "type": "docker", - "containers": map[string]interface{}{ - "ids": []interface{}{"abc"}, - }, - "multiline": map[string]interface{}{ - "pattern": "^test", - "negate": "true", + result: []common.MapStr{ + { + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + "multiline": map[string]interface{}{ + "pattern": "^test", + "negate": "true", + }, }, }, }, @@ -308,20 +367,22 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "type": "docker", - "containers": map[string]interface{}{ - "ids": []interface{}{"abc"}, - }, - "close_timeout": "true", - "processors": []interface{}{ - map[string]interface{}{ - "dissect": map[string]interface{}{ - "tokenizer": "%{key1} %{key2}", + result: []common.MapStr{ + { + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", + "processors": []interface{}{ + map[string]interface{}{ + "dissect": map[string]interface{}{ + "tokenizer": "%{key1} %{key2}", + }, + }, + map[string]interface{}{ + "drop_event": nil, }, - }, - map[string]interface{}{ - "drop_event": nil, }, }, }, @@ -348,28 +409,30 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "apache2", - "error": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "docker", - "containers": map[string]interface{}{ - "stream": "all", - "ids": []interface{}{"abc"}, + result: []common.MapStr{ + { + "module": "apache2", + "error": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "docker", + "containers": map[string]interface{}{ + "stream": "all", + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", }, - "close_timeout": "true", }, - }, - "access": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "docker", - "containers": map[string]interface{}{ - "stream": "all", - "ids": []interface{}{"abc"}, + "access": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "docker", + "containers": map[string]interface{}{ + "stream": "all", + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", }, - "close_timeout": "true", }, }, }, @@ -397,28 +460,30 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "apache2", - "access": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "docker", - "containers": map[string]interface{}{ - "stream": "all", - "ids": []interface{}{"abc"}, + result: []common.MapStr{ + { + "module": "apache2", + "access": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "docker", + "containers": map[string]interface{}{ + "stream": "all", + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", }, - "close_timeout": "true", }, - }, - "error": map[string]interface{}{ - "enabled": false, - "input": map[string]interface{}{ - "type": "docker", - "containers": map[string]interface{}{ - "stream": "all", - "ids": []interface{}{"abc"}, + "error": map[string]interface{}{ + "enabled": false, + "input": map[string]interface{}{ + "type": "docker", + "containers": map[string]interface{}{ + "stream": "all", + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", }, - "close_timeout": "true", }, }, }, @@ -447,28 +512,30 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "apache2", - "access": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "docker", - "containers": map[string]interface{}{ - "stream": "stdout", - "ids": []interface{}{"abc"}, + result: []common.MapStr{ + { + "module": "apache2", + "access": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "docker", + "containers": map[string]interface{}{ + "stream": "stdout", + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", }, - "close_timeout": "true", }, - }, - "error": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "docker", - "containers": map[string]interface{}{ - "stream": "stderr", - "ids": []interface{}{"abc"}, + "error": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "docker", + "containers": map[string]interface{}{ + "stream": "stderr", + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", }, - "close_timeout": "true", }, }, }, @@ -495,25 +562,27 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "apache2", - "error": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "container", - "stream": "all", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + result: []common.MapStr{ + { + "module": "apache2", + "error": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "container", + "stream": "all", + "paths": []interface{}{ + "/var/lib/docker/containers/abc/*-json.log", + }, }, }, - }, - "access": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "container", - "stream": "all", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + "access": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "container", + "stream": "all", + "paths": []interface{}{ + "/var/lib/docker/containers/abc/*-json.log", + }, }, }, }, @@ -542,25 +611,27 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "apache2", - "access": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "container", - "stream": "all", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + result: []common.MapStr{ + { + "module": "apache2", + "access": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "container", + "stream": "all", + "paths": []interface{}{ + "/var/lib/docker/containers/abc/*-json.log", + }, }, }, - }, - "error": map[string]interface{}{ - "enabled": false, - "input": map[string]interface{}{ - "type": "container", - "stream": "all", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + "error": map[string]interface{}{ + "enabled": false, + "input": map[string]interface{}{ + "type": "container", + "stream": "all", + "paths": []interface{}{ + "/var/lib/docker/containers/abc/*-json.log", + }, }, }, }, @@ -590,25 +661,27 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "apache2", - "access": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "container", - "stream": "stdout", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + result: []common.MapStr{ + { + "module": "apache2", + "access": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "container", + "stream": "stdout", + "paths": []interface{}{ + "/var/lib/docker/containers/abc/*-json.log", + }, }, }, - }, - "error": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "container", - "stream": "stderr", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + "error": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "container", + "stream": "stderr", + "paths": []interface{}{ + "/var/lib/docker/containers/abc/*-json.log", + }, }, }, }, @@ -629,15 +702,18 @@ func TestGenerateHints(t *testing.T) { } cfgs := l.CreateConfig(test.event) - assert.Equal(t, len(cfgs), test.len, test.msg) - if test.len != 0 { + assert.Equal(t, test.len, len(cfgs), test.msg) + configs := make([]common.MapStr, 0) + for _, cfg := range cfgs { config := common.MapStr{} - err := cfgs[0].Unpack(&config) - assert.Nil(t, err, test.msg) - - assert.Equal(t, test.result, config, test.msg) + err := cfg.Unpack(&config) + ok := assert.Nil(t, err, test.msg) + if !ok { + break + } + configs = append(configs, config) } - + assert.Equal(t, test.result, configs, test.msg) } } @@ -861,7 +937,7 @@ func TestGenerateHintsWithPaths(t *testing.T) { } cfgs := l.CreateConfig(test.event) - assert.Equal(t, len(cfgs), test.len, test.msg) + assert.Equal(t, test.len, len(cfgs), test.msg) if test.len != 0 { config := common.MapStr{} err := cfgs[0].Unpack(&config) diff --git a/filebeat/docs/autodiscover-hints.asciidoc b/filebeat/docs/autodiscover-hints.asciidoc index 8201de0e620..9c1893c8367 100644 --- a/filebeat/docs/autodiscover-hints.asciidoc +++ b/filebeat/docs/autodiscover-hints.asciidoc @@ -155,6 +155,22 @@ annotations: co.elastic.logs.sidecar/exclude_lines: '^DBG' ----- +[float] +===== Multiple sets of hints +When a container needs multiple inputs to be defined on it, sets of annotations can be provided with numeric prefixes. +If there are hints that don't have a numeric prefix then they get grouped together into a single configuration. + +["source","yaml",subs="attributes"] +------------------------------------------------------------------------------------- +annotations: + co.elastic.logs/exclude_lines: '^DBG' + co.elastic.logs/1.include_lines: '^DBG' + co.elastic.logs/1.processors.dissect.tokenizer: "%{key2} %{key1}" +------------------------------------------------------------------------------------- + +The above configuration would generate two input configurations. The first input handles only debug logs and passes it through a dissect +tokenizer. The second input handles everything but debug logs. + [float] ===== Configuring Namespace Defaults diff --git a/heartbeat/autodiscover/builder/hints/monitors.go b/heartbeat/autodiscover/builder/hints/monitors.go index f9fe8847d3e..ca182fe37f2 100644 --- a/heartbeat/autodiscover/builder/hints/monitors.go +++ b/heartbeat/autodiscover/builder/hints/monitors.go @@ -19,8 +19,6 @@ package hints import ( "fmt" - "sort" - "strconv" "strings" "github.com/elastic/go-ucfg" @@ -97,7 +95,7 @@ func (hb *heartbeatHints) CreateConfig(event bus.Event, options ...ucfg.Option) } tempCfg := common.MapStr{} - monitors := hb.getMonitors(hints) + monitors := builder.GetHintsAsList(hints, hb.config.Key) var configs []*common.Config for _, monitor := range monitors { @@ -138,44 +136,6 @@ func (hb *heartbeatHints) getRawConfigs(hints common.MapStr) []common.MapStr { return builder.GetHintAsConfigs(hints, hb.config.Key) } -func (hb *heartbeatHints) getMonitors(hints common.MapStr) []common.MapStr { - raw := builder.GetHintMapStr(hints, hb.config.Key, "") - if raw == nil { - return nil - } - - var words, nums []string - - for key := range raw { - if _, err := strconv.Atoi(key); err != nil { - words = append(words, key) - continue - } else { - nums = append(nums, key) - } - } - - sort.Strings(nums) - - var configs []common.MapStr - for _, key := range nums { - rawCfg, _ := raw[key] - if config, ok := rawCfg.(common.MapStr); ok { - configs = append(configs, config) - } - } - - defaultMap := common.MapStr{} - for _, word := range words { - defaultMap[word] = raw[word] - } - - if len(defaultMap) != 0 { - configs = append(configs, defaultMap) - } - return configs -} - func (hb *heartbeatHints) getProcessors(hints common.MapStr) []common.MapStr { return builder.GetConfigs(hints, "", "processors") } diff --git a/heartbeat/docs/autodiscover-hints.asciidoc b/heartbeat/docs/autodiscover-hints.asciidoc index d89b52f8508..0b6a44115c9 100644 --- a/heartbeat/docs/autodiscover-hints.asciidoc +++ b/heartbeat/docs/autodiscover-hints.asciidoc @@ -92,6 +92,21 @@ annotations: ----- +[float] +===== Multiple sets of hints +When a container needs multiple monitors to be defined on it, sets of annotations can be provided with numeric prefixes. +Annotations without numeric prefixes would default into a single monitor configuration. + +["source","yaml",subs="attributes"] +------------------------------------------------------------------------------------- +annotations: + co.elastic.monitor/type: http + co.elastic.monitor/hosts: ${data.host}:8080/healtlz + co.elastic.monitor/schedule: "@every 5s" + co.elastic.monitor/1.type: tcp + co.elastic.monitor/1.hosts: ${data.host}:8080 + co.elastic.monitor/1.schedule: "@every 5s" +------------------------------------------------------------------------------------- [float] diff --git a/libbeat/autodiscover/builder/helper.go b/libbeat/autodiscover/builder/helper.go index b6d52a08eb5..faaa112c65f 100644 --- a/libbeat/autodiscover/builder/helper.go +++ b/libbeat/autodiscover/builder/helper.go @@ -248,3 +248,42 @@ func GenerateHints(annotations common.MapStr, container, prefix string) common.M return hints } + +// GetHintsAsList gets a set of hints and tries to convert them into a list of hints +func GetHintsAsList(hints common.MapStr, key string) []common.MapStr { + raw := GetHintMapStr(hints, key, "") + if raw == nil { + return nil + } + + var words, nums []string + + for key := range raw { + if _, err := strconv.Atoi(key); err != nil { + words = append(words, key) + continue + } else { + nums = append(nums, key) + } + } + + sort.Strings(nums) + + var configs []common.MapStr + for _, key := range nums { + rawCfg, _ := raw[key] + if config, ok := rawCfg.(common.MapStr); ok { + configs = append(configs, config) + } + } + + defaultMap := common.MapStr{} + for _, word := range words { + defaultMap[word] = raw[word] + } + + if len(defaultMap) != 0 { + configs = append(configs, defaultMap) + } + return configs +} diff --git a/libbeat/autodiscover/builder/helper_test.go b/libbeat/autodiscover/builder/helper_test.go index e8b5ce52179..dee7c95ef13 100644 --- a/libbeat/autodiscover/builder/helper_test.go +++ b/libbeat/autodiscover/builder/helper_test.go @@ -219,3 +219,97 @@ func TestGenerateHints(t *testing.T) { assert.Equal(t, test.result, GenerateHints(annMap, "foobar", "co.elastic")) } } +func TestGetHintsAsList(t *testing.T) { + tests := []struct { + input common.MapStr + output []common.MapStr + message string + }{ + { + input: common.MapStr{ + "metrics": common.MapStr{ + "module": "prometheus", + "period": "15s", + }, + }, + output: []common.MapStr{ + { + "module": "prometheus", + "period": "15s", + }, + }, + message: "Single hint should return a single set of configs", + }, + { + input: common.MapStr{ + "metrics": common.MapStr{ + "1": common.MapStr{ + "module": "prometheus", + "period": "15s", + }, + }, + }, + output: []common.MapStr{ + { + "module": "prometheus", + "period": "15s", + }, + }, + message: "Single hint with numeric prefix should return a single set of configs", + }, + { + input: common.MapStr{ + "metrics": common.MapStr{ + "1": common.MapStr{ + "module": "prometheus", + "period": "15s", + }, + "2": common.MapStr{ + "module": "dropwizard", + "period": "20s", + }, + }, + }, + output: []common.MapStr{ + { + "module": "prometheus", + "period": "15s", + }, + { + "module": "dropwizard", + "period": "20s", + }, + }, + message: "Multiple hints with numeric prefix should return configs in numeric ordering", + }, + { + input: common.MapStr{ + "metrics": common.MapStr{ + "1": common.MapStr{ + "module": "prometheus", + "period": "15s", + }, + "module": "dropwizard", + "period": "20s", + }, + }, + output: []common.MapStr{ + { + "module": "prometheus", + "period": "15s", + }, + { + "module": "dropwizard", + "period": "20s", + }, + }, + message: "Multiple hints with numeric prefix and default should return configs with defaults at the last", + }, + } + + for _, test := range tests { + t.Run(test.message, func(t *testing.T) { + assert.Equal(t, test.output, GetHintsAsList(test.input, "metrics")) + }) + } +} diff --git a/metricbeat/autodiscover/builder/hints/metrics.go b/metricbeat/autodiscover/builder/hints/metrics.go index 84ce8d65a86..52eba34a1ff 100644 --- a/metricbeat/autodiscover/builder/hints/metrics.go +++ b/metricbeat/autodiscover/builder/hints/metrics.go @@ -75,12 +75,12 @@ func NewMetricHints(cfg *common.Config) (autodiscover.Builder, error) { // Create configs based on hints passed from providers func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*common.Config { var ( - config []*common.Config - noPort bool + configs []*common.Config + noPort bool ) host, _ := event["host"].(string) if host == "" { - return config + return configs } port, ok := common.TryToInt(event["port"]) @@ -90,10 +90,10 @@ func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*c hints, ok := event["hints"].(common.MapStr) if !ok { - return config + return configs } - modulesConfig := m.getModules(hints) + modulesConfig := m.getModuleConfigs(hints) // here we handle raw configs if provided if modulesConfig != nil { configs := []*common.Config{} @@ -108,64 +108,67 @@ func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*c } - mod := m.getModule(hints) - if mod == "" { - return config - } + modules := m.getModules(hints) + for _, hint := range modules { + mod := m.getModule(hint) + if mod == "" { + continue + } - hosts, ok := m.getHostsWithPort(hints, port, noPort) - if !ok { - return config - } + hosts, ok := m.getHostsWithPort(hint, port, noPort) + if !ok { + continue + } - ns := m.getNamespace(hints) - msets := m.getMetricSets(hints, mod) - tout := m.getTimeout(hints) - ival := m.getPeriod(hints) - sslConf := m.getSSLConfig(hints) - procs := m.getProcessors(hints) - metricspath := m.getMetricPath(hints) - username := m.getUsername(hints) - password := m.getPassword(hints) - - moduleConfig := common.MapStr{ - "module": mod, - "metricsets": msets, - "hosts": hosts, - "timeout": tout, - "period": ival, - "enabled": true, - "ssl": sslConf, - "processors": procs, - } + ns := m.getNamespace(hint) + msets := m.getMetricSets(hint, mod) + tout := m.getTimeout(hint) + ival := m.getPeriod(hint) + sslConf := m.getSSLConfig(hint) + procs := m.getProcessors(hint) + metricspath := m.getMetricPath(hint) + username := m.getUsername(hint) + password := m.getPassword(hint) + + moduleConfig := common.MapStr{ + "module": mod, + "metricsets": msets, + "hosts": hosts, + "timeout": tout, + "period": ival, + "enabled": true, + "ssl": sslConf, + "processors": procs, + } - if ns != "" { - moduleConfig["namespace"] = ns - } - if metricspath != "" { - moduleConfig["metrics_path"] = metricspath - } - if username != "" { - moduleConfig["username"] = username - } - if password != "" { - moduleConfig["password"] = password - } + if ns != "" { + moduleConfig["namespace"] = ns + } + if metricspath != "" { + moduleConfig["metrics_path"] = metricspath + } + if username != "" { + moduleConfig["username"] = username + } + if password != "" { + moduleConfig["password"] = password + } - m.logger.Debug("generated config: %v", moduleConfig) + logp.Debug("hints.builder", "generated config: %v", moduleConfig) - // Create config object - cfg, err := common.NewConfigFrom(moduleConfig) - if err != nil { - logp.Debug("", "config merge failed with error: %v", err) + // Create config object + cfg, err := common.NewConfigFrom(moduleConfig) + if err != nil { + logp.Debug("hints.builder", "config merge failed with error: %v", err) + } + logp.Debug("hints.builder", "generated config: %+v", common.DebugString(cfg, true)) + configs = append(configs, cfg) } - m.logger.Debug("generated config: %+v", common.DebugString(cfg, true)) - config = append(config, cfg) // Apply information in event to the template to generate the final config // This especially helps in a scenario where endpoints are configured as: // co.elastic.metrics/hosts= "${data.host}:9090" - return template.ApplyConfigTemplate(event, config, options...) + return template.ApplyConfigTemplate(event, configs, options...) } func (m *metricHints) getModule(hints common.MapStr) string { @@ -267,7 +270,7 @@ func (m *metricHints) getSSLConfig(hints common.MapStr) common.MapStr { return builder.GetHintMapStr(hints, m.Key, ssl) } -func (m *metricHints) getModules(hints common.MapStr) []common.MapStr { +func (m *metricHints) getModuleConfigs(hints common.MapStr) []common.MapStr { return builder.GetHintAsConfigs(hints, m.Key) } @@ -275,3 +278,16 @@ func (m *metricHints) getProcessors(hints common.MapStr) []common.MapStr { return builder.GetProcessors(hints, m.Key) } + +func (m *metricHints) getModules(hints common.MapStr) []common.MapStr { + modules := builder.GetHintsAsList(hints, m.Key) + var output []common.MapStr + + for _, mod := range modules { + output = append(output, common.MapStr{ + m.Key: mod, + }) + } + + return output +} diff --git a/metricbeat/autodiscover/builder/hints/metrics_test.go b/metricbeat/autodiscover/builder/hints/metrics_test.go index 3b306ac62c0..d2d0462a047 100644 --- a/metricbeat/autodiscover/builder/hints/metrics_test.go +++ b/metricbeat/autodiscover/builder/hints/metrics_test.go @@ -38,7 +38,7 @@ func TestGenerateHints(t *testing.T) { message string event bus.Event len int - result common.MapStr + result []common.MapStr }{ { message: "Empty event hints should return empty config", @@ -58,7 +58,7 @@ func TestGenerateHints(t *testing.T) { }, }, len: 0, - result: common.MapStr{}, + result: []common.MapStr{}, }, { message: "Hints without host should return nothing", @@ -70,7 +70,7 @@ func TestGenerateHints(t *testing.T) { }, }, len: 0, - result: common.MapStr{}, + result: []common.MapStr{}, }, { message: "Hints without matching port should return nothing", @@ -85,7 +85,7 @@ func TestGenerateHints(t *testing.T) { }, }, len: 0, - result: common.MapStr{}, + result: []common.MapStr{}, }, { message: "Hints with multiple hosts return only the matching one", @@ -100,13 +100,15 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "metricsets": []string{"default"}, - "timeout": "3s", - "period": "1m", - "enabled": true, - "hosts": []interface{}{"1.2.3.4:9090"}, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9090"}, + }, }, }, { @@ -122,13 +124,15 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "metricsets": []string{"default"}, - "timeout": "3s", - "period": "1m", - "enabled": true, - "hosts": []interface{}{"1.2.3.4:9090"}, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9090"}, + }, }, }, { @@ -142,12 +146,14 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmodule", - "metricsets": []string{"one", "two"}, - "timeout": "3s", - "period": "1m", - "enabled": true, + result: []common.MapStr{ + { + "module": "mockmodule", + "metricsets": []string{"one", "two"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + }, }, }, { @@ -162,12 +168,14 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmodule", - "metricsets": []string{"one"}, - "timeout": "3s", - "period": "1m", - "enabled": true, + result: []common.MapStr{ + { + "module": "mockmodule", + "metricsets": []string{"one"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + }, }, }, { @@ -181,12 +189,14 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "metricsets": []string{"default"}, - "timeout": "3s", - "period": "1m", - "enabled": true, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + }, }, }, { @@ -200,12 +210,14 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "metricsets": []string{"default"}, - "timeout": "3s", - "period": "1m", - "enabled": true, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + }, }, }, { @@ -222,14 +234,16 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "namespace": "test", - "metricsets": []string{"default"}, - "timeout": "3s", - "period": "1m", - "enabled": true, - "hosts": []interface{}{"1.2.3.4:9090"}, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "namespace": "test", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9090"}, + }, }, }, { @@ -250,18 +264,20 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "namespace": "test", - "metricsets": []string{"default"}, - "timeout": "3s", - "period": "1m", - "enabled": true, - "hosts": []interface{}{"1.2.3.4:9090"}, - "processors": []interface{}{ - map[string]interface{}{ - "add_locale": map[string]interface{}{ - "abbrevation": "MST", + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "namespace": "test", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9090"}, + "processors": []interface{}{ + map[string]interface{}{ + "add_locale": map[string]interface{}{ + "abbrevation": "MST", + }, }, }, }, @@ -296,14 +312,16 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "namespace": "test", - "metricsets": []string{"default"}, - "hosts": []interface{}{"1.2.3.4:9090"}, - "timeout": "3s", - "period": "1m", - "enabled": true, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "namespace": "test", + "metricsets": []string{"default"}, + "hosts": []interface{}{"1.2.3.4:9090"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + }, }, }, { @@ -311,15 +329,18 @@ func TestGenerateHints(t *testing.T) { event: bus.Event{ "host": "1.2.3.4", "port": 80, - "hints": common.MapStr{ - "metrics": common.MapStr{ - "module": "mockmoduledefaults", - "namespace": "test", - "hosts": "${data.host}:8080", + "hints": []common.MapStr{ + { + "metrics": common.MapStr{ + "module": "mockmoduledefaults", + "namespace": "test", + "hosts": "${data.host}:8080", + }, }, }, }, - len: 0, + len: 0, + result: []common.MapStr{}, }, { message: "Non http URLs with valid host port combination should return a valid config", @@ -335,14 +356,57 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "namespace": "test", - "metricsets": []string{"default"}, - "hosts": []interface{}{"tcp(1.2.3.4:3306)/"}, - "timeout": "3s", - "period": "1m", - "enabled": true, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "namespace": "test", + "metricsets": []string{"default"}, + "hosts": []interface{}{"tcp(1.2.3.4:3306)/"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + }, + }, + }, + { + message: "Module with mutliple sets of hints must return the right configs", + event: bus.Event{ + "host": "1.2.3.4", + "hints": common.MapStr{ + "metrics": common.MapStr{ + "1": common.MapStr{ + "module": "mockmoduledefaults", + "namespace": "test", + "hosts": "${data.host}:9090", + }, + "2": common.MapStr{ + "module": "mockmoduledefaults", + "namespace": "test1", + "hosts": "${data.host}:9090/fake", + }, + }, + }, + }, + len: 2, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "namespace": "test", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9090"}, + }, + { + "module": "mockmoduledefaults", + "namespace": "test1", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9090/fake"}, + }, }, }, } @@ -361,11 +425,18 @@ func TestGenerateHints(t *testing.T) { cfgs := m.CreateConfig(test.event) assert.Equal(t, len(cfgs), test.len) - if len(cfgs) != 0 { + // The check below helps skipping config validation if there is no config supposed to be emitted. + if len(cfgs) == 0 { + continue + } + configs := make([]common.MapStr, 0) + for _, cfg := range cfgs { config := common.MapStr{} - err := cfgs[0].Unpack(&config) - assert.Nil(t, err, test.message) - + err := cfg.Unpack(&config) + ok := assert.Nil(t, err, test.message) + if !ok { + break + } // metricsets order is random, order it for tests if v, err := config.GetValue("metricsets"); err == nil { if msets, ok := v.([]interface{}); ok { @@ -377,9 +448,9 @@ func TestGenerateHints(t *testing.T) { config["metricsets"] = metricsets } } - - assert.Equal(t, test.result, config, test.message) + configs = append(configs, config) } + assert.Equal(t, test.result, configs, test.message) } } diff --git a/metricbeat/docs/autodiscover-hints.asciidoc b/metricbeat/docs/autodiscover-hints.asciidoc index 25029a61d05..ef99de73fc6 100644 --- a/metricbeat/docs/autodiscover-hints.asciidoc +++ b/metricbeat/docs/autodiscover-hints.asciidoc @@ -129,6 +129,25 @@ annotations: co.elastic.metrics.sidecar/hosts: '${data.host}:8080' ------------------------------------------------------------------------------------- +[float] +===== Multiple sets of hints +When a container port needs multiple modules to be defined on it, sets of annotations can be provided with numeric prefixes. +If there are hints that don't have a numeric prefix then they get grouped together into a single configuration. + +["source","yaml",subs="attributes"] +------------------------------------------------------------------------------------- +annotations: + co.elastic.metrics/1.module: prometheus + co.elastic.metrics/1.hosts: '${data.host}:80/metrics' + co.elastic.metrics/1.period: 60s + co.elastic.metrics/module: prometheus + co.elastic.metrics/hosts: '${data.host}:80/metrics/p1' + co.elastic.metrics/period: 5s +------------------------------------------------------------------------------------- + +The above configuration would spin up two metricbeat module configurations to ensure that the endpoint "/metrics/p1" is +polled every 60s whereas the "/metrics" endpoint is polled every 60s. + [float] ===== Configuring Namespace Defaults