Skip to content

Commit

Permalink
Put data stream so there is no need for additional permissions (#31048)
Browse files Browse the repository at this point in the history
## What does this PR do?

This PR adds a new step to loading templates. Now not only the template is loaded, but the data stream is created as well. Given that users might load templates from JSON file that are not data streams, I added a new option called `setup.template.json.data_stream`. It has to be set, if the JSON template is a data stream.

## Why is it important?

Without this change users needed more permissions to publish events. Now `create_doc` priviledge is  enough to publish events to the data stream.

Closes #30647
Closes #30567
  • Loading branch information
kvch authored Apr 6, 2022
1 parent 9993939 commit 5cdb312
Show file tree
Hide file tree
Showing 23 changed files with 336 additions and 32 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif
- Fix the ability for subcommands to be ran properly from the beats containers. {pull}30452[30452]
- Update docker/distribution dependency library to fix a security issues concerning OCI Manifest Type Confusion Issue. {pull}30462[30462]
- Fix dissect trim panics from DELETE (127)(\u007f) character {issue}30657[30657] {pull}30658[30658]
- Load data stream during setup, so users do not need extra permissions during publishing. {issue}30647[30647] {pull}31048[31048]
- Add ecs container fields {pull}31020[31020]
- Fix docs reference for syslog processor {pull}31087[31087]

Expand Down Expand Up @@ -99,6 +100,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif
- Add syslog parser and processor. {issue}30139[30139] {pull}30541[30541]
- Add action_input_type for the .fleet-actions-results {pull}30562[30562]
- Add cronjob metadata by default {pull}30637[30637]
- New option `setup.template.json.data_stream` is added to indicate if the JSON index template is a data stream. {pull}31048[31048]

*Auditbeat*

Expand Down
3 changes: 3 additions & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1203,6 +1203,9 @@ output.elasticsearch:
# Name under which the template is stored in Elasticsearch
#setup.template.json.name: ""

# Set this option if the JSON template is a data stream.
#setup.template.json.data_stream: false

# Overwrite existing template
# Do not enable this option for more than one instance of auditbeat as it might
# overload your Elasticsearch with too many update requests.
Expand Down
3 changes: 3 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2322,6 +2322,9 @@ output.elasticsearch:
# Name under which the template is stored in Elasticsearch
#setup.template.json.name: ""

# Set this option if the JSON template is a data stream.
#setup.template.json.data_stream: false

# Overwrite existing template
# Do not enable this option for more than one instance of filebeat as it might
# overload your Elasticsearch with too many update requests.
Expand Down
3 changes: 3 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1349,6 +1349,9 @@ output.elasticsearch:
# Name under which the template is stored in Elasticsearch
#setup.template.json.name: ""

# Set this option if the JSON template is a data stream.
#setup.template.json.data_stream: false

# Overwrite existing template
# Do not enable this option for more than one instance of heartbeat as it might
# overload your Elasticsearch with too many update requests.
Expand Down
3 changes: 3 additions & 0 deletions libbeat/_meta/config/setup.template.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
# Name under which the template is stored in Elasticsearch
#setup.template.json.name: ""

# Set this option if the JSON template is a data stream.
#setup.template.json.data_stream: false

# Overwrite existing template
# Do not enable this option for more than one instance of {{.BeatName}} as it might
# overload your Elasticsearch with too many update requests.
Expand Down
3 changes: 3 additions & 0 deletions libbeat/docs/template-config.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ set the name of the template.
setup.template.json.enabled: true
setup.template.json.path: "template.json"
setup.template.json.name: "template-name
setup.template.json.data_stream: false
----------------------------------------------------------------------

NOTE: If the JSON template is used, the `fields.yml` is skipped for the template
generation.

NOTE: If the JSON template is a data stream, set `setup.template.json.data_stream`.
17 changes: 10 additions & 7 deletions libbeat/esleg/eslegclient/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewConnection(s ConnectionSettings) (*Connection, error) {

u, err := url.Parse(s.URL)
if err != nil {
return nil, fmt.Errorf("failed to parse elasticsearch URL: %v", err)
return nil, fmt.Errorf("failed to parse elasticsearch URL: %w", err)
}

if u.User != nil {
Expand Down Expand Up @@ -242,13 +242,16 @@ func NewConnectedClient(cfg *common.Config, beatname string) (*Connection, error
// the configured host, updates the known Elasticsearch version and calls
// globally configured handlers.
func (conn *Connection) Connect() error {
if conn.log == nil {
conn.log = logp.NewLogger("esclientleg")
}
if err := conn.getVersion(); err != nil {
return err
}

if conn.OnConnectCallback != nil {
if err := conn.OnConnectCallback(); err != nil {
return fmt.Errorf("Connection marked as failed because the onConnect callback failed: %v", err)
return fmt.Errorf("Connection marked as failed because the onConnect callback failed: %w", err)
}
}

Expand All @@ -266,7 +269,7 @@ func (conn *Connection) Ping() (string, error) {
}

if status >= 300 {
return "", fmt.Errorf("Non 2xx response code: %d", status)
return "", fmt.Errorf("non 2xx response code: %d", status)
}

var response struct {
Expand All @@ -277,7 +280,7 @@ func (conn *Connection) Ping() (string, error) {

err = json.Unmarshal(body, &response)
if err != nil {
return "", fmt.Errorf("Failed to parse JSON response: %v", err)
return "", fmt.Errorf("failed to parse JSON response: %w", err)
}

conn.log.Debugf("Ping status code: %v", status)
Expand Down Expand Up @@ -362,7 +365,7 @@ func (conn *Connection) execRequest(
method, url string,
body io.Reader,
) (int, []byte, error) {
req, err := http.NewRequest(method, url, body)
req, err := http.NewRequest(method, url, body) //nolint:noctx // keep legacy behaviour
if err != nil {
conn.log.Warnf("Failed to create request %+v", err)
return 0, nil, err
Expand All @@ -376,7 +379,7 @@ func (conn *Connection) execRequest(
// GetVersion returns the elasticsearch version the client is connected to.
func (conn *Connection) GetVersion() common.Version {
if !conn.version.IsValid() {
conn.getVersion()
_ = conn.getVersion()
}

return conn.version
Expand All @@ -402,7 +405,7 @@ func (conn *Connection) getVersion() error {
func (conn *Connection) LoadJSON(path string, json map[string]interface{}) ([]byte, error) {
status, body, err := conn.Request("PUT", path, "", nil, json)
if err != nil {
return body, fmt.Errorf("couldn't load json. Error: %s", err)
return body, fmt.Errorf("couldn't load json. Error: %w", err)
}
if status > 300 {
return body, fmt.Errorf("couldn't load json. Status: %v", status)
Expand Down
7 changes: 4 additions & 3 deletions libbeat/template/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ type TemplateConfig struct {
Pattern string `config:"pattern"`
Fields string `config:"fields"`
JSON struct {
Enabled bool `config:"enabled"`
Path string `config:"path"`
Name string `config:"name"`
Enabled bool `config:"enabled"`
Path string `config:"path"`
Name string `config:"name"`
IsDataStream bool `config:"data_stream"`
} `config:"json"`
AppendFields mapping.Fields `config:"append_fields"`
Overwrite bool `config:"overwrite"`
Expand Down
62 changes: 54 additions & 8 deletions libbeat/template/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,30 @@ func (l *ESLoader) Load(config TemplateConfig, info beat.Info, fields []byte, mi
return fmt.Errorf("failed to load template: %w", err)
}
l.log.Infof("Template with name %q loaded.", templateName)

// if JSON template is loaded and it is not a data stream
// we are done with loading.
if config.JSON.Enabled && !config.JSON.IsDataStream {
return nil
}

// If a data stream already exists, we do not attempt to delete or overwrite
// it because it would delete all backing indices, and the user would lose all
// their documents.
dataStreamExist, err := l.checkExistsDatastream(templateName)
if err != nil {
return fmt.Errorf("failed to check data stream: %w", err)
}
if dataStreamExist {
l.log.Infof("Data stream with name %q already exists.", templateName)
return nil
}

if err := l.putDataStream(templateName); err != nil {
return fmt.Errorf("failed to put data stream: %w", err)
}
l.log.Infof("Data stream with name %q loaded.", templateName)

return nil
}

Expand All @@ -135,14 +159,36 @@ func (l *ESLoader) loadTemplate(templateName string, template map[string]interfa
path := "/_index_template/" + templateName
status, body, err := l.client.Request("PUT", path, "", nil, template)
if err != nil {
return fmt.Errorf("couldn't load template: %v. Response body: %s", err, body)
return fmt.Errorf("couldn't load template: %w. Response body: %s", err, body)
}
if status > http.StatusMultipleChoices { //http status 300
return fmt.Errorf("couldn't load json. Status: %v", status)
}
return nil
}

func (l *ESLoader) checkExistsDatastream(name string) (bool, error) {
status, _, err := l.client.Request("GET", "/_data_stream/"+name, "", nil, nil)
if status == http.StatusNotFound {
return false, nil
}
if err != nil {
return false, err
}

return true, nil
}

func (l *ESLoader) putDataStream(name string) error {
l.log.Infof("Try loading data stream %s to Elasticsearch", name)
path := "/_data_stream/" + name
_, body, err := l.client.Request("PUT", path, "", nil, nil)
if err != nil {
return fmt.Errorf("could not put data stream: %w. Response body: %s", err, body)
}
return nil
}

// existsTemplate checks if a given template already exist, using the
// `/_index_template/<name>` API.
//
Expand Down Expand Up @@ -176,7 +222,7 @@ func (l *FileLoader) Load(config TemplateConfig, info beat.Info, fields []byte,

str := fmt.Sprintf("%s\n", body.StringToPrint())
if err := l.client.Write("template", tmpl.name, str); err != nil {
return fmt.Errorf("error printing template: %v", err)
return fmt.Errorf("error printing template: %w", err)
}
return nil
}
Expand All @@ -188,7 +234,7 @@ func (b *templateBuilder) template(config TemplateConfig, info beat.Info, esVers
}
tmpl, err := New(info.Version, info.IndexPrefix, info.ElasticLicensed, esVersion, config, migration)
if err != nil {
return nil, fmt.Errorf("error creating template instance: %v", err)
return nil, fmt.Errorf("error creating template instance: %w", err)
}
return tmpl, nil
}
Expand All @@ -214,18 +260,18 @@ func (b *templateBuilder) buildBody(tmpl *Template, config TemplateConfig, field
func (b *templateBuilder) buildBodyFromJSON(config TemplateConfig) (common.MapStr, error) {
jsonPath := paths.Resolve(paths.Config, config.JSON.Path)
if _, err := os.Stat(jsonPath); err != nil {
return nil, fmt.Errorf("error checking json file %s for template: %v", jsonPath, err)
return nil, fmt.Errorf("error checking json file %s for template: %w", jsonPath, err)
}
b.log.Debugf("Loading json template from file %s", jsonPath)
content, err := ioutil.ReadFile(jsonPath)
if err != nil {
return nil, fmt.Errorf("error reading file %s for template: %v", jsonPath, err)
return nil, fmt.Errorf("error reading file %s for template: %w", jsonPath, err)

}
var body map[string]interface{}
err = json.Unmarshal(content, &body)
if err != nil {
return nil, fmt.Errorf("could not unmarshal json template: %s", err)
return nil, fmt.Errorf("could not unmarshal json template: %w", err)
}
return body, nil
}
Expand All @@ -235,7 +281,7 @@ func (b *templateBuilder) buildBodyFromFile(tmpl *Template, config TemplateConfi
fieldsPath := paths.Resolve(paths.Config, config.Fields)
body, err := tmpl.LoadFile(fieldsPath)
if err != nil {
return nil, fmt.Errorf("error creating template from file %s: %v", fieldsPath, err)
return nil, fmt.Errorf("error creating template from file %s: %w", fieldsPath, err)
}
return body, nil
}
Expand All @@ -244,7 +290,7 @@ func (b *templateBuilder) buildBodyFromFields(tmpl *Template, fields []byte) (co
b.log.Debug("Load default fields")
body, err := tmpl.LoadBytes(fields)
if err != nil {
return nil, fmt.Errorf("error creating template: %v", err)
return nil, fmt.Errorf("error creating template: %w", err)
}
return body, nil
}
Expand Down
Loading

0 comments on commit 5cdb312

Please sign in to comment.