From 5cdb3125950c534ce3c8d1eb311004a7fa60616b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Wed, 6 Apr 2022 10:43:59 +0200 Subject: [PATCH] Put data stream so there is no need for additional permissions (#31048) ## What does this PR do? This PR adds a new step to loading templates. Now not only the template is loaded, but the data stream is created as well. Given that users might load templates from JSON file that are not data streams, I added a new option called `setup.template.json.data_stream`. It has to be set, if the JSON template is a data stream. ## Why is it important? Without this change users needed more permissions to publish events. Now `create_doc` priviledge is enough to publish events to the data stream. Closes https://github.com/elastic/beats/issues/30647 Closes https://github.com/elastic/beats/pull/30567 --- CHANGELOG.next.asciidoc | 2 + auditbeat/auditbeat.reference.yml | 3 + filebeat/filebeat.reference.yml | 3 + heartbeat/heartbeat.reference.yml | 3 + .../config/setup.template.reference.yml.tmpl | 3 + libbeat/docs/template-config.asciidoc | 3 + libbeat/esleg/eslegclient/connection.go | 17 +- libbeat/template/config.go | 7 +- libbeat/template/load.go | 62 +++++- libbeat/template/load_integration_test.go | 202 +++++++++++++++++- libbeat/template/load_test.go | 11 +- .../template/testdata/fields-data-stream.json | 19 ++ metricbeat/metricbeat.reference.yml | 3 + packetbeat/packetbeat.reference.yml | 3 + winlogbeat/winlogbeat.reference.yml | 3 + x-pack/auditbeat/auditbeat.reference.yml | 3 + x-pack/filebeat/filebeat.reference.yml | 3 + .../functionbeat/functionbeat.reference.yml | 3 + x-pack/heartbeat/heartbeat.reference.yml | 3 + x-pack/metricbeat/metricbeat.reference.yml | 3 + x-pack/osquerybeat/osquerybeat.reference.yml | 3 + x-pack/packetbeat/packetbeat.reference.yml | 3 + x-pack/winlogbeat/winlogbeat.reference.yml | 3 + 23 files changed, 336 insertions(+), 32 deletions(-) create mode 100644 libbeat/template/testdata/fields-data-stream.json diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2ee0be2934d4..b520d9ad279b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -41,6 +41,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif - Fix the ability for subcommands to be ran properly from the beats containers. {pull}30452[30452] - Update docker/distribution dependency library to fix a security issues concerning OCI Manifest Type Confusion Issue. {pull}30462[30462] - Fix dissect trim panics from DELETE (127)(\u007f) character {issue}30657[30657] {pull}30658[30658] +- Load data stream during setup, so users do not need extra permissions during publishing. {issue}30647[30647] {pull}31048[31048] - Add ecs container fields {pull}31020[31020] - Fix docs reference for syslog processor {pull}31087[31087] @@ -99,6 +100,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif - Add syslog parser and processor. {issue}30139[30139] {pull}30541[30541] - Add action_input_type for the .fleet-actions-results {pull}30562[30562] - Add cronjob metadata by default {pull}30637[30637] +- New option `setup.template.json.data_stream` is added to indicate if the JSON index template is a data stream. {pull}31048[31048] *Auditbeat* diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index bfe0eafaeec7..b03c95ab44cf 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -1203,6 +1203,9 @@ output.elasticsearch: # Name under which the template is stored in Elasticsearch #setup.template.json.name: "" +# Set this option if the JSON template is a data stream. +#setup.template.json.data_stream: false + # Overwrite existing template # Do not enable this option for more than one instance of auditbeat as it might # overload your Elasticsearch with too many update requests. diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 635dc852a9bd..0caa3da8765a 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -2322,6 +2322,9 @@ output.elasticsearch: # Name under which the template is stored in Elasticsearch #setup.template.json.name: "" +# Set this option if the JSON template is a data stream. +#setup.template.json.data_stream: false + # Overwrite existing template # Do not enable this option for more than one instance of filebeat as it might # overload your Elasticsearch with too many update requests. diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index 9948b3a0b62b..59380e2050fd 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -1349,6 +1349,9 @@ output.elasticsearch: # Name under which the template is stored in Elasticsearch #setup.template.json.name: "" +# Set this option if the JSON template is a data stream. +#setup.template.json.data_stream: false + # Overwrite existing template # Do not enable this option for more than one instance of heartbeat as it might # overload your Elasticsearch with too many update requests. diff --git a/libbeat/_meta/config/setup.template.reference.yml.tmpl b/libbeat/_meta/config/setup.template.reference.yml.tmpl index d3019c6f2b8a..c423eb277e03 100644 --- a/libbeat/_meta/config/setup.template.reference.yml.tmpl +++ b/libbeat/_meta/config/setup.template.reference.yml.tmpl @@ -33,6 +33,9 @@ # Name under which the template is stored in Elasticsearch #setup.template.json.name: "" +# Set this option if the JSON template is a data stream. +#setup.template.json.data_stream: false + # Overwrite existing template # Do not enable this option for more than one instance of {{.BeatName}} as it might # overload your Elasticsearch with too many update requests. diff --git a/libbeat/docs/template-config.asciidoc b/libbeat/docs/template-config.asciidoc index 2ec64291b803..f85fbeb11d0d 100644 --- a/libbeat/docs/template-config.asciidoc +++ b/libbeat/docs/template-config.asciidoc @@ -128,7 +128,10 @@ set the name of the template. setup.template.json.enabled: true setup.template.json.path: "template.json" setup.template.json.name: "template-name +setup.template.json.data_stream: false ---------------------------------------------------------------------- NOTE: If the JSON template is used, the `fields.yml` is skipped for the template generation. + +NOTE: If the JSON template is a data stream, set `setup.template.json.data_stream`. diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index 6f2f13aa6b2b..2c3bef295e2c 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -91,7 +91,7 @@ func NewConnection(s ConnectionSettings) (*Connection, error) { u, err := url.Parse(s.URL) if err != nil { - return nil, fmt.Errorf("failed to parse elasticsearch URL: %v", err) + return nil, fmt.Errorf("failed to parse elasticsearch URL: %w", err) } if u.User != nil { @@ -242,13 +242,16 @@ func NewConnectedClient(cfg *common.Config, beatname string) (*Connection, error // the configured host, updates the known Elasticsearch version and calls // globally configured handlers. func (conn *Connection) Connect() error { + if conn.log == nil { + conn.log = logp.NewLogger("esclientleg") + } if err := conn.getVersion(); err != nil { return err } if conn.OnConnectCallback != nil { if err := conn.OnConnectCallback(); err != nil { - return fmt.Errorf("Connection marked as failed because the onConnect callback failed: %v", err) + return fmt.Errorf("Connection marked as failed because the onConnect callback failed: %w", err) } } @@ -266,7 +269,7 @@ func (conn *Connection) Ping() (string, error) { } if status >= 300 { - return "", fmt.Errorf("Non 2xx response code: %d", status) + return "", fmt.Errorf("non 2xx response code: %d", status) } var response struct { @@ -277,7 +280,7 @@ func (conn *Connection) Ping() (string, error) { err = json.Unmarshal(body, &response) if err != nil { - return "", fmt.Errorf("Failed to parse JSON response: %v", err) + return "", fmt.Errorf("failed to parse JSON response: %w", err) } conn.log.Debugf("Ping status code: %v", status) @@ -362,7 +365,7 @@ func (conn *Connection) execRequest( method, url string, body io.Reader, ) (int, []byte, error) { - req, err := http.NewRequest(method, url, body) + req, err := http.NewRequest(method, url, body) //nolint:noctx // keep legacy behaviour if err != nil { conn.log.Warnf("Failed to create request %+v", err) return 0, nil, err @@ -376,7 +379,7 @@ func (conn *Connection) execRequest( // GetVersion returns the elasticsearch version the client is connected to. func (conn *Connection) GetVersion() common.Version { if !conn.version.IsValid() { - conn.getVersion() + _ = conn.getVersion() } return conn.version @@ -402,7 +405,7 @@ func (conn *Connection) getVersion() error { func (conn *Connection) LoadJSON(path string, json map[string]interface{}) ([]byte, error) { status, body, err := conn.Request("PUT", path, "", nil, json) if err != nil { - return body, fmt.Errorf("couldn't load json. Error: %s", err) + return body, fmt.Errorf("couldn't load json. Error: %w", err) } if status > 300 { return body, fmt.Errorf("couldn't load json. Status: %v", status) diff --git a/libbeat/template/config.go b/libbeat/template/config.go index 4f9fe2348c7d..9e8a2c246321 100644 --- a/libbeat/template/config.go +++ b/libbeat/template/config.go @@ -29,9 +29,10 @@ type TemplateConfig struct { Pattern string `config:"pattern"` Fields string `config:"fields"` JSON struct { - Enabled bool `config:"enabled"` - Path string `config:"path"` - Name string `config:"name"` + Enabled bool `config:"enabled"` + Path string `config:"path"` + Name string `config:"name"` + IsDataStream bool `config:"data_stream"` } `config:"json"` AppendFields mapping.Fields `config:"append_fields"` Overwrite bool `config:"overwrite"` diff --git a/libbeat/template/load.go b/libbeat/template/load.go index 1900d8f5a75a..ffce2c6db9c4 100644 --- a/libbeat/template/load.go +++ b/libbeat/template/load.go @@ -124,6 +124,30 @@ func (l *ESLoader) Load(config TemplateConfig, info beat.Info, fields []byte, mi return fmt.Errorf("failed to load template: %w", err) } l.log.Infof("Template with name %q loaded.", templateName) + + // if JSON template is loaded and it is not a data stream + // we are done with loading. + if config.JSON.Enabled && !config.JSON.IsDataStream { + return nil + } + + // If a data stream already exists, we do not attempt to delete or overwrite + // it because it would delete all backing indices, and the user would lose all + // their documents. + dataStreamExist, err := l.checkExistsDatastream(templateName) + if err != nil { + return fmt.Errorf("failed to check data stream: %w", err) + } + if dataStreamExist { + l.log.Infof("Data stream with name %q already exists.", templateName) + return nil + } + + if err := l.putDataStream(templateName); err != nil { + return fmt.Errorf("failed to put data stream: %w", err) + } + l.log.Infof("Data stream with name %q loaded.", templateName) + return nil } @@ -135,7 +159,7 @@ func (l *ESLoader) loadTemplate(templateName string, template map[string]interfa path := "/_index_template/" + templateName status, body, err := l.client.Request("PUT", path, "", nil, template) if err != nil { - return fmt.Errorf("couldn't load template: %v. Response body: %s", err, body) + return fmt.Errorf("couldn't load template: %w. Response body: %s", err, body) } if status > http.StatusMultipleChoices { //http status 300 return fmt.Errorf("couldn't load json. Status: %v", status) @@ -143,6 +167,28 @@ func (l *ESLoader) loadTemplate(templateName string, template map[string]interfa return nil } +func (l *ESLoader) checkExistsDatastream(name string) (bool, error) { + status, _, err := l.client.Request("GET", "/_data_stream/"+name, "", nil, nil) + if status == http.StatusNotFound { + return false, nil + } + if err != nil { + return false, err + } + + return true, nil +} + +func (l *ESLoader) putDataStream(name string) error { + l.log.Infof("Try loading data stream %s to Elasticsearch", name) + path := "/_data_stream/" + name + _, body, err := l.client.Request("PUT", path, "", nil, nil) + if err != nil { + return fmt.Errorf("could not put data stream: %w. Response body: %s", err, body) + } + return nil +} + // existsTemplate checks if a given template already exist, using the // `/_index_template/` API. // @@ -176,7 +222,7 @@ func (l *FileLoader) Load(config TemplateConfig, info beat.Info, fields []byte, str := fmt.Sprintf("%s\n", body.StringToPrint()) if err := l.client.Write("template", tmpl.name, str); err != nil { - return fmt.Errorf("error printing template: %v", err) + return fmt.Errorf("error printing template: %w", err) } return nil } @@ -188,7 +234,7 @@ func (b *templateBuilder) template(config TemplateConfig, info beat.Info, esVers } tmpl, err := New(info.Version, info.IndexPrefix, info.ElasticLicensed, esVersion, config, migration) if err != nil { - return nil, fmt.Errorf("error creating template instance: %v", err) + return nil, fmt.Errorf("error creating template instance: %w", err) } return tmpl, nil } @@ -214,18 +260,18 @@ func (b *templateBuilder) buildBody(tmpl *Template, config TemplateConfig, field func (b *templateBuilder) buildBodyFromJSON(config TemplateConfig) (common.MapStr, error) { jsonPath := paths.Resolve(paths.Config, config.JSON.Path) if _, err := os.Stat(jsonPath); err != nil { - return nil, fmt.Errorf("error checking json file %s for template: %v", jsonPath, err) + return nil, fmt.Errorf("error checking json file %s for template: %w", jsonPath, err) } b.log.Debugf("Loading json template from file %s", jsonPath) content, err := ioutil.ReadFile(jsonPath) if err != nil { - return nil, fmt.Errorf("error reading file %s for template: %v", jsonPath, err) + return nil, fmt.Errorf("error reading file %s for template: %w", jsonPath, err) } var body map[string]interface{} err = json.Unmarshal(content, &body) if err != nil { - return nil, fmt.Errorf("could not unmarshal json template: %s", err) + return nil, fmt.Errorf("could not unmarshal json template: %w", err) } return body, nil } @@ -235,7 +281,7 @@ func (b *templateBuilder) buildBodyFromFile(tmpl *Template, config TemplateConfi fieldsPath := paths.Resolve(paths.Config, config.Fields) body, err := tmpl.LoadFile(fieldsPath) if err != nil { - return nil, fmt.Errorf("error creating template from file %s: %v", fieldsPath, err) + return nil, fmt.Errorf("error creating template from file %s: %w", fieldsPath, err) } return body, nil } @@ -244,7 +290,7 @@ func (b *templateBuilder) buildBodyFromFields(tmpl *Template, fields []byte) (co b.log.Debug("Load default fields") body, err := tmpl.LoadBytes(fields) if err != nil { - return nil, fmt.Errorf("error creating template: %v", err) + return nil, fmt.Errorf("error creating template: %w", err) } return body, nil } diff --git a/libbeat/template/load_integration_test.go b/libbeat/template/load_integration_test.go index 7f80f84d99da..3390e1680565 100644 --- a/libbeat/template/load_integration_test.go +++ b/libbeat/template/load_integration_test.go @@ -25,8 +25,11 @@ import ( "fmt" "io/ioutil" "math/rand" + "net/http" + "net/http/httptest" "path/filepath" "strconv" + "strings" "testing" "time" @@ -67,12 +70,23 @@ func newTestSetup(t *testing.T, cfg TemplateConfig) *testSetup { t.Fatal(err) } s := testSetup{t: t, client: client, loader: NewESLoader(client), config: cfg} + client.Request("DELETE", "/_data_stream/"+cfg.Name, "", nil, nil) + s.requireDataStreamDoesNotExist("") client.Request("DELETE", "/_index_template/"+cfg.Name, "", nil, nil) s.requireTemplateDoesNotExist("") return &s } +func newTestSetupWithESClient(t *testing.T, client ESClient, cfg TemplateConfig) *testSetup { + t.Helper() + if cfg.Name == "" { + cfg.Name = fmt.Sprintf("load-test-%+v", rand.Int()) + } + return &testSetup{t: t, client: client, loader: NewESLoader(client), config: cfg} +} + func (ts *testSetup) mustLoadTemplate(body map[string]interface{}) { + ts.t.Helper() err := ts.loader.loadTemplate(ts.config.Name, body) require.NoError(ts.t, err) ts.requireTemplateExists("") @@ -108,6 +122,11 @@ func (ts *testSetup) requireTemplateExists(name string) { require.True(ts.t, exists, "template must exist: %s", name) } +func (ts *testSetup) cleanupDataStream(name string) { + ts.client.Request("DELETE", "/_data_stream/"+name, "", nil, nil) + ts.requireDataStreamDoesNotExist(name) +} + func (ts *testSetup) cleanupTemplate(name string) { ts.client.Request("DELETE", "/_index_template/"+name, "", nil, nil) ts.requireTemplateDoesNotExist(name) @@ -122,6 +141,48 @@ func (ts *testSetup) requireTemplateDoesNotExist(name string) { require.False(ts.t, exists, "template must not exist") } +func (ts *testSetup) requireDataStreamDoesNotExist(name string) { + if name == "" { + name = ts.config.Name + } + exists, err := ts.loader.checkExistsDatastream(name) + require.NoError(ts.t, err, "failed to query data stream status") + require.False(ts.t, exists, "data stream must not exist") +} + +func (ts *testSetup) sendTestEvent() { + evt := map[string]interface{}{ + "@timestamp": "2099-11-15T13:12:00", + "message": "my super important message", + } + c, _, err := ts.client.Request(http.MethodPut, "/"+ts.config.Name+"/_create/1", "", nil, evt) + require.NoError(ts.t, err) + require.Equal(ts.t, c, http.StatusCreated, "document must be created with id 1") + + // refresh index so the event becomes available immediately + _, _, err = ts.client.Request(http.MethodPost, "/"+ts.config.Name+"/_refresh", "", nil, nil) + require.NoError(ts.t, err) +} + +// requireTestEventPresent validates that the event is available +// returns the backing index of the event +func (ts *testSetup) requireTestEventPresent() string { + c, b, err := ts.client.Request("GET", "/"+ts.config.Name+"/_search", "", nil, nil) + require.NoError(ts.t, err) + require.Equal(ts.t, http.StatusOK, c) + + var resp eslegclient.SearchResults + err = json.Unmarshal(b, &resp) + require.Equal(ts.t, 1, resp.Hits.Total.Value, "the test event must be returned") + + idx := struct { + Index string `json:"_index"` + }{Index: ""} + err = json.Unmarshal(resp.Hits.Hits[0], &idx) + require.NoError(ts.t, err, "backing index name must be parsed") + return idx.Index +} + func TestESLoader_Load(t *testing.T) { t.Run("failure", func(t *testing.T) { t.Run("loading disabled", func(t *testing.T) { @@ -139,6 +200,55 @@ func TestESLoader_Load(t *testing.T) { require.Error(t, err) require.Contains(t, err.Error(), "version is not semver") }) + + t.Run("no Elasticsearch client", func(t *testing.T) { + setup := newTestSetupWithESClient(t, nil, TemplateConfig{Enabled: true}) + + beatInfo := beat.Info{Version: "9.9.9"} + err := setup.loader.Load(setup.config, beatInfo, nil, false) + require.Error(t, err) + require.Contains(t, err.Error(), "can not load template without active Elasticsearch client") + }) + + t.Run("cannot check template", func(t *testing.T) { + m := getMockElasticsearchClient(t, "HEAD", "/_index_template/", 500, []byte("cannot check template")) + setup := newTestSetupWithESClient(t, m, TemplateConfig{Enabled: true}) + + beatInfo := beat.Info{Version: "9.9.9"} + err := setup.loader.Load(setup.config, beatInfo, nil, false) + require.Error(t, err) + require.Contains(t, err.Error(), "failure while checking if template exists", "Load must return error because template cannot be checked") + }) + + t.Run("cannot load template", func(t *testing.T) { + m := getMockElasticsearchClient(t, "PUT", "/_index_template/", 503, []byte("cannot load template")) + setup := newTestSetupWithESClient(t, m, TemplateConfig{Enabled: true, Overwrite: true}) + + beatInfo := beat.Info{Version: "9.9.9"} + err := setup.loader.Load(setup.config, beatInfo, nil, false) + require.Error(t, err) + require.Contains(t, err.Error(), "failed to load template", "Load must return error because we cannot load the index template") + }) + + t.Run("cannot check data stream", func(t *testing.T) { + m := getMockElasticsearchClient(t, "GET", "/_data_stream/", 503, []byte("error checking data stream")) + setup := newTestSetupWithESClient(t, m, TemplateConfig{Enabled: true, Overwrite: true}) + + beatInfo := beat.Info{Version: "9.9.9"} + err := setup.loader.Load(setup.config, beatInfo, nil, false) + require.Error(t, err) + require.Contains(t, err.Error(), "failed to check data stream", "Load must return error because data stream cannot be checked") + }) + + t.Run("cannot load data stream", func(t *testing.T) { + m := getMockElasticsearchClient(t, "PUT", "/_data_stream/", 300, nil) + setup := newTestSetupWithESClient(t, m, TemplateConfig{Enabled: true, Overwrite: true}) + + beatInfo := beat.Info{Version: "9.9.9"} + err := setup.loader.Load(setup.config, beatInfo, nil, false) + require.Error(t, err) + require.Contains(t, err.Error(), "failed to put data stream", "Load must return error because data stream cannot be uploaded") + }) }) t.Run("overwrite", func(t *testing.T) { @@ -161,6 +271,25 @@ func TestESLoader_Load(t *testing.T) { tmpl := getTemplate(t, setup.client, setup.config.Name) assert.Equal(t, false, tmpl.SourceEnabled()) }) + + t.Run("preserve existing data stream even if overwriting templates is allowed", func(t *testing.T) { + fields, err := ioutil.ReadFile(path(t, []string{"testdata", "default_fields.yml"})) + require.NoError(t, err) + setup := newTestSetup(t, TemplateConfig{Enabled: true, Overwrite: true}) + setup.mustLoad(fields) + + exists, err := setup.loader.checkExistsDatastream(setup.config.Name) + require.True(t, exists, "data stream must exits") + + // send test event before reloading the template + setup.sendTestEvent() + backingIdx := setup.requireTestEventPresent() + + setup.mustLoad(fields) + + newBackingIdx := setup.requireTestEventPresent() + require.Equal(setup.t, backingIdx, newBackingIdx, "the event must be present in the same backing index") + }) }) t.Run("json.name", func(t *testing.T) { @@ -171,10 +300,11 @@ func TestESLoader_Load(t *testing.T) { // Load Template with same name, but different JSON.name and ensure it is used setup.config.JSON = struct { - Enabled bool `config:"enabled"` - Path string `config:"path"` - Name string `config:"name"` - }{Enabled: true, Path: path(t, []string{"testdata", "fields.json"}), Name: nameJSON} + Enabled bool `config:"enabled"` + Path string `config:"path"` + Name string `config:"name"` + IsDataStream bool `config:"data_stream"` + }{Enabled: true, Path: path(t, []string{"testdata", "fields.json"}), Name: nameJSON, IsDataStream: false} setup.load(nil) setup.requireTemplateExists(nameJSON) setup.cleanupTemplate(nameJSON) @@ -205,10 +335,21 @@ func TestESLoader_Load(t *testing.T) { }, "fields from json": { cfg: TemplateConfig{Enabled: true, JSON: struct { - Enabled bool `config:"enabled"` - Path string `config:"path"` - Name string `config:"name"` - }{Enabled: true, Path: path(t, []string{"testdata", "fields.json"}), Name: "json-template"}}, + Enabled bool `config:"enabled"` + Path string `config:"path"` + Name string `config:"name"` + IsDataStream bool `config:"data_stream"` + }{Enabled: true, Path: path(t, []string{"testdata", "fields.json"}), Name: "json-template", IsDataStream: false}}, + fields: fields, + properties: []string{"host_name"}, + }, + "fields from json with data stream": { + cfg: TemplateConfig{Enabled: true, JSON: struct { + Enabled bool `config:"enabled"` + Path string `config:"path"` + Name string `config:"name"` + IsDataStream bool `config:"data_stream"` + }{Enabled: true, Path: path(t, []string{"testdata", "fields-data-stream.json"}), Name: "json-ds", IsDataStream: true}}, fields: fields, properties: []string{"host_name"}, }, @@ -235,6 +376,9 @@ func TestESLoader_Load(t *testing.T) { } assert.ElementsMatch(t, properties, data.properties) } + if !data.cfg.JSON.Enabled || data.cfg.JSON.IsDataStream { + setup.cleanupDataStream(setup.config.Name) + } setup.cleanupTemplate(setup.config.Name) }) } @@ -419,3 +563,45 @@ func getTestingElasticsearch(t eslegtest.TestLogger) *eslegclient.Connection { return conn } + +func getMockElasticsearchClient(t *testing.T, method, endpoint string, code int, body []byte) *eslegclient.Connection { + server := esMock(t, method, endpoint, code, body) + conn, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{ + URL: server.URL, + Transport: httpcommon.DefaultHTTPTransportSettings(), + }) + require.NoError(t, err) + err = conn.Connect() + require.NoError(t, err) + return conn +} + +func esMock(t *testing.T, method, endpoint string, code int, body []byte) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/" { + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`{"version":{"number":"5.0.0"}}`)) + return + } + + if r.Method == method && strings.HasPrefix(r.URL.Path, endpoint) { + w.WriteHeader(code) + w.Header().Set("Content-Type", "application/json") + w.Write(body) + return + } + + c := 200 + // if we are checking if the data stream is available, + // return 404 so the client will try to load it. + if r.Method == "GET" && strings.HasPrefix(r.URL.Path, "/_data_stream") { + c = 404 + } + w.WriteHeader(c) + if body != nil { + w.Header().Set("Content-Type", "application/json") + w.Write(body) + } + })) +} diff --git a/libbeat/template/load_test.go b/libbeat/template/load_test.go index 429cf7382ab7..fd66fa2e603c 100644 --- a/libbeat/template/load_test.go +++ b/libbeat/template/load_test.go @@ -204,18 +204,17 @@ func TestFileLoader_Load(t *testing.T) { type: text analyzer: simple `), - wantErr: errors.New(`error creating template: inconsistent definitions for analyzers with the name "test_powershell"`), + wantErr: fmt.Errorf(`error creating template: %w`, errors.New(`inconsistent definitions for analyzers with the name "test_powershell"`)), }, } { t.Run(name, func(t *testing.T) { - fc, err := newFileClient(ver) - require.NoError(t, err) + fc := newFileClient(ver) fl := NewFileLoader(fc) cfg := DefaultConfig(info) cfg.Settings = test.settings - err = fl.Load(cfg, info, test.fields, false) + err := fl.Load(cfg, info, test.fields, false) require.Equal(t, test.wantErr, err) if err != nil { return @@ -235,8 +234,8 @@ type fileClient struct { component, name, body, ver string } -func newFileClient(ver string) (*fileClient, error) { - return &fileClient{ver: ver}, nil +func newFileClient(ver string) *fileClient { + return &fileClient{ver: ver} } func (c *fileClient) GetVersion() common.Version { diff --git a/libbeat/template/testdata/fields-data-stream.json b/libbeat/template/testdata/fields-data-stream.json new file mode 100644 index 000000000000..1226421fbd7b --- /dev/null +++ b/libbeat/template/testdata/fields-data-stream.json @@ -0,0 +1,19 @@ +{ + "data_stream": {}, + "index_patterns": ["json-ds"], + "template": { + "settings": { + "number_of_shards": 1 + }, + "mappings": { + "_source": { + "enabled": false + }, + "properties": { + "host_name": { + "type": "keyword" + } + } + } + } +} diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index f8905d9a4248..20c4e6d13038 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -2076,6 +2076,9 @@ output.elasticsearch: # Name under which the template is stored in Elasticsearch #setup.template.json.name: "" +# Set this option if the JSON template is a data stream. +#setup.template.json.data_stream: false + # Overwrite existing template # Do not enable this option for more than one instance of metricbeat as it might # overload your Elasticsearch with too many update requests. diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index 4b39046e4c70..5f2f24c06d21 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -1698,6 +1698,9 @@ output.elasticsearch: # Name under which the template is stored in Elasticsearch #setup.template.json.name: "" +# Set this option if the JSON template is a data stream. +#setup.template.json.data_stream: false + # Overwrite existing template # Do not enable this option for more than one instance of packetbeat as it might # overload your Elasticsearch with too many update requests. diff --git a/winlogbeat/winlogbeat.reference.yml b/winlogbeat/winlogbeat.reference.yml index 4ebba6c88bab..ac54ff3510b1 100644 --- a/winlogbeat/winlogbeat.reference.yml +++ b/winlogbeat/winlogbeat.reference.yml @@ -1139,6 +1139,9 @@ output.elasticsearch: # Name under which the template is stored in Elasticsearch #setup.template.json.name: "" +# Set this option if the JSON template is a data stream. +#setup.template.json.data_stream: false + # Overwrite existing template # Do not enable this option for more than one instance of winlogbeat as it might # overload your Elasticsearch with too many update requests. diff --git a/x-pack/auditbeat/auditbeat.reference.yml b/x-pack/auditbeat/auditbeat.reference.yml index e212152c2a04..8fb193d3ecc9 100644 --- a/x-pack/auditbeat/auditbeat.reference.yml +++ b/x-pack/auditbeat/auditbeat.reference.yml @@ -1259,6 +1259,9 @@ output.elasticsearch: # Name under which the template is stored in Elasticsearch #setup.template.json.name: "" +# Set this option if the JSON template is a data stream. +#setup.template.json.data_stream: false + # Overwrite existing template # Do not enable this option for more than one instance of auditbeat as it might # overload your Elasticsearch with too many update requests. diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index dd7dff593f28..dc869588cfd8 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -4520,6 +4520,9 @@ output.elasticsearch: # Name under which the template is stored in Elasticsearch #setup.template.json.name: "" +# Set this option if the JSON template is a data stream. +#setup.template.json.data_stream: false + # Overwrite existing template # Do not enable this option for more than one instance of filebeat as it might # overload your Elasticsearch with too many update requests. diff --git a/x-pack/functionbeat/functionbeat.reference.yml b/x-pack/functionbeat/functionbeat.reference.yml index 20478c6aed68..5081df6499a2 100644 --- a/x-pack/functionbeat/functionbeat.reference.yml +++ b/x-pack/functionbeat/functionbeat.reference.yml @@ -982,6 +982,9 @@ output.elasticsearch: # Name under which the template is stored in Elasticsearch #setup.template.json.name: "" +# Set this option if the JSON template is a data stream. +#setup.template.json.data_stream: false + # Overwrite existing template # Do not enable this option for more than one instance of functionbeat as it might # overload your Elasticsearch with too many update requests. diff --git a/x-pack/heartbeat/heartbeat.reference.yml b/x-pack/heartbeat/heartbeat.reference.yml index 9948b3a0b62b..59380e2050fd 100644 --- a/x-pack/heartbeat/heartbeat.reference.yml +++ b/x-pack/heartbeat/heartbeat.reference.yml @@ -1349,6 +1349,9 @@ output.elasticsearch: # Name under which the template is stored in Elasticsearch #setup.template.json.name: "" +# Set this option if the JSON template is a data stream. +#setup.template.json.data_stream: false + # Overwrite existing template # Do not enable this option for more than one instance of heartbeat as it might # overload your Elasticsearch with too many update requests. diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index 135021697ae9..48f881ce1b35 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -2607,6 +2607,9 @@ output.elasticsearch: # Name under which the template is stored in Elasticsearch #setup.template.json.name: "" +# Set this option if the JSON template is a data stream. +#setup.template.json.data_stream: false + # Overwrite existing template # Do not enable this option for more than one instance of metricbeat as it might # overload your Elasticsearch with too many update requests. diff --git a/x-pack/osquerybeat/osquerybeat.reference.yml b/x-pack/osquerybeat/osquerybeat.reference.yml index 54c5f87ea869..66dd59b39c6b 100644 --- a/x-pack/osquerybeat/osquerybeat.reference.yml +++ b/x-pack/osquerybeat/osquerybeat.reference.yml @@ -701,6 +701,9 @@ output.elasticsearch: # Name under which the template is stored in Elasticsearch #setup.template.json.name: "" +# Set this option if the JSON template is a data stream. +#setup.template.json.data_stream: false + # Overwrite existing template # Do not enable this option for more than one instance of osquerybeat as it might # overload your Elasticsearch with too many update requests. diff --git a/x-pack/packetbeat/packetbeat.reference.yml b/x-pack/packetbeat/packetbeat.reference.yml index 4b39046e4c70..5f2f24c06d21 100644 --- a/x-pack/packetbeat/packetbeat.reference.yml +++ b/x-pack/packetbeat/packetbeat.reference.yml @@ -1698,6 +1698,9 @@ output.elasticsearch: # Name under which the template is stored in Elasticsearch #setup.template.json.name: "" +# Set this option if the JSON template is a data stream. +#setup.template.json.data_stream: false + # Overwrite existing template # Do not enable this option for more than one instance of packetbeat as it might # overload your Elasticsearch with too many update requests. diff --git a/x-pack/winlogbeat/winlogbeat.reference.yml b/x-pack/winlogbeat/winlogbeat.reference.yml index f083fabb0e5d..3866783fd203 100644 --- a/x-pack/winlogbeat/winlogbeat.reference.yml +++ b/x-pack/winlogbeat/winlogbeat.reference.yml @@ -1141,6 +1141,9 @@ output.elasticsearch: # Name under which the template is stored in Elasticsearch #setup.template.json.name: "" +# Set this option if the JSON template is a data stream. +#setup.template.json.data_stream: false + # Overwrite existing template # Do not enable this option for more than one instance of winlogbeat as it might # overload your Elasticsearch with too many update requests.