Skip to content

Commit

Permalink
feat(outputs.elasticsearch): Allow custom template index settings
Browse files Browse the repository at this point in the history
  • Loading branch information
DStrand1 committed Jun 10, 2024
1 parent 05ba495 commit 39519b2
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 37 deletions.
10 changes: 10 additions & 0 deletions plugins/outputs/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,16 @@ to use them.
# To pass custom HTTP headers please define it in a given below section
# [outputs.elasticsearch.headers]
# "X-Custom-Header" = "custom-value"

## Template Index Settings
## Overrides the template settings.index section with any provided options.
## Defaults provided here in the config
# template_index_settings = {
# refresh_interval = "10s",
# mapping.total_fields.limit = 5000,
# auto_expand_replicas = "0-1",
# codec = "best_compression"
# }
```

### Permissions
Expand Down
100 changes: 63 additions & 37 deletions plugins/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"crypto/sha256"
_ "embed"
"encoding/json"
"errors"
"fmt"
"math"
Expand All @@ -28,27 +29,28 @@ import (
var sampleConfig string

type Elasticsearch struct {
AuthBearerToken config.Secret `toml:"auth_bearer_token"`
DefaultPipeline string `toml:"default_pipeline"`
DefaultTagValue string `toml:"default_tag_value"`
EnableGzip bool `toml:"enable_gzip"`
EnableSniffer bool `toml:"enable_sniffer"`
FloatHandling string `toml:"float_handling"`
FloatReplacement float64 `toml:"float_replacement_value"`
ForceDocumentID bool `toml:"force_document_id"`
HealthCheckInterval config.Duration `toml:"health_check_interval"`
HealthCheckTimeout config.Duration `toml:"health_check_timeout"`
IndexName string `toml:"index_name"`
ManageTemplate bool `toml:"manage_template"`
OverwriteTemplate bool `toml:"overwrite_template"`
Username config.Secret `toml:"username"`
Password config.Secret `toml:"password"`
TemplateName string `toml:"template_name"`
Timeout config.Duration `toml:"timeout"`
URLs []string `toml:"urls"`
UsePipeline string `toml:"use_pipeline"`
Headers map[string]string `toml:"headers"`
Log telegraf.Logger `toml:"-"`
AuthBearerToken config.Secret `toml:"auth_bearer_token"`
DefaultPipeline string `toml:"default_pipeline"`
DefaultTagValue string `toml:"default_tag_value"`
EnableGzip bool `toml:"enable_gzip"`
EnableSniffer bool `toml:"enable_sniffer"`
FloatHandling string `toml:"float_handling"`
FloatReplacement float64 `toml:"float_replacement_value"`
ForceDocumentID bool `toml:"force_document_id"`
HealthCheckInterval config.Duration `toml:"health_check_interval"`
HealthCheckTimeout config.Duration `toml:"health_check_timeout"`
IndexName string `toml:"index_name"`
IndexTemplate map[string]interface{} `toml:"template_index_settings"`
ManageTemplate bool `toml:"manage_template"`
OverwriteTemplate bool `toml:"overwrite_template"`
Username config.Secret `toml:"username"`
Password config.Secret `toml:"password"`
TemplateName string `toml:"template_name"`
Timeout config.Duration `toml:"timeout"`
URLs []string `toml:"urls"`
UsePipeline string `toml:"use_pipeline"`
Headers map[string]string `toml:"headers"`
Log telegraf.Logger `toml:"-"`
majorReleaseNumber int
pipelineName string
pipelineTagKeys []string
Expand All @@ -66,12 +68,7 @@ const telegrafTemplate = `
"index_patterns" : [ "{{.TemplatePattern}}" ],
{{ end }}
"settings": {
"index": {
"refresh_interval": "10s",
"mapping.total_fields.limit": 5000,
"auto_expand_replicas" : "0-1",
"codec" : "best_compression"
}
"index": {{.IndexTemplate}}
},
"mappings" : {
{{ if (lt .Version 7) }}
Expand Down Expand Up @@ -128,9 +125,18 @@ const telegrafTemplate = `
}
}`

const defaultTemplateIndexSettings = `
{
"refresh_interval": "10s",
"mapping.total_fields.limit": 5000,
"auto_expand_replicas": "0-1",
"codec": "best_compression"
}`

type templatePart struct {
TemplatePattern string
Version int
IndexTemplate string
}

func (*Elasticsearch) SampleConfig() string {
Expand Down Expand Up @@ -369,19 +375,12 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
}

if (a.OverwriteTemplate) || (!templateExists) || (templatePattern != "") {
tp := templatePart{
TemplatePattern: templatePattern + "*",
Version: a.majorReleaseNumber,
}

t := template.Must(template.New("template").Parse(telegrafTemplate))
var tmpl bytes.Buffer

if err := t.Execute(&tmpl, tp); err != nil {
data, err := a.createNewTemplate(templatePattern)
if err != nil {
return err
}
_, errCreateTemplate := a.Client.IndexPutTemplate(a.TemplateName).BodyString(tmpl.String()).Do(ctx)

_, errCreateTemplate := a.Client.IndexPutTemplate(a.TemplateName).BodyString(data.String()).Do(ctx)
if errCreateTemplate != nil {
return fmt.Errorf("elasticsearch failed to create index template %s: %w", a.TemplateName, errCreateTemplate)
}
Expand All @@ -393,6 +392,33 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
return nil
}

func (a *Elasticsearch) createNewTemplate(templatePattern string) (*bytes.Buffer, error) {
var indexTemplate string
if a.IndexTemplate != nil {
data, err := json.Marshal(&a.IndexTemplate)
if err != nil {
return nil, fmt.Errorf("elasticsearch failed to create index settings for template %s: %w", a.TemplateName, err)
}
indexTemplate = string(data)
} else {
indexTemplate = defaultTemplateIndexSettings
}

tp := templatePart{
TemplatePattern: templatePattern + "*",
Version: a.majorReleaseNumber,
IndexTemplate: indexTemplate,
}

t := template.Must(template.New("template").Parse(telegrafTemplate))
var tmpl bytes.Buffer

if err := t.Execute(&tmpl, tp); err != nil {
return nil, err
}
return &tmpl, nil
}

func (a *Elasticsearch) GetTagKeys(indexName string) (string, []string) {
tagKeys := []string{}
startTag := strings.Index(indexName, "{{")
Expand Down
49 changes: 49 additions & 0 deletions plugins/outputs/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package elasticsearch

import (
"context"
"encoding/json"
"fmt"
"math"
"net/http"
Expand Down Expand Up @@ -755,3 +756,51 @@ func TestAuthorizationHeaderWhenBearerTokenIsPresent(t *testing.T) {
err = e.Write(testutil.MockMetrics())
require.NoError(t, err)
}

func TestStandardIndexSettings(t *testing.T) {
e := &Elasticsearch{
TemplateName: "test",
IndexName: "telegraf-%Y.%m.%d",
Log: testutil.Logger{},
}
buf, err := e.createNewTemplate("test")
require.NoError(t, err)
var jsonData esTemplate
err = json.Unmarshal(buf.Bytes(), &jsonData)
require.NoError(t, err)
index := jsonData.Settings.Index
require.Equal(t, "10s", index["refresh_interval"])
require.Equal(t, float64(5000), index["mapping.total_fields.limit"])
require.Equal(t, "0-1", index["auto_expand_replicas"])
require.Equal(t, "best_compression", index["codec"])
}

func TestDifferentIndexSettings(t *testing.T) {
e := &Elasticsearch{
TemplateName: "test",
IndexName: "telegraf-%Y.%m.%d",
IndexTemplate: map[string]interface{}{
"refresh_interval": "20s",
"mapping.total_fields.limit": 1000,
"codec": "best_compression",
},
Log: testutil.Logger{},
}
buf, err := e.createNewTemplate("test")
require.NoError(t, err)
var jsonData esTemplate
err = json.Unmarshal(buf.Bytes(), &jsonData)
require.NoError(t, err)
index := jsonData.Settings.Index
require.Equal(t, "20s", index["refresh_interval"])
require.Equal(t, float64(1000), index["mapping.total_fields.limit"])
require.Equal(t, "best_compression", index["codec"])
}

type esTemplate struct {
Settings esSettings `json:"settings"`
}

type esSettings struct {
Index map[string]interface{} `json:"index"`
}
10 changes: 10 additions & 0 deletions plugins/outputs/elasticsearch/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,13 @@
# To pass custom HTTP headers please define it in a given below section
# [outputs.elasticsearch.headers]
# "X-Custom-Header" = "custom-value"

## Template Index Settings
## Overrides the template settings.index section with any provided options.
## Defaults provided here in the config
# template_index_settings = {
# refresh_interval = "10s",
# mapping.total_fields.limit = 5000,
# auto_expand_replicas = "0-1",
# codec = "best_compression"
# }

0 comments on commit 39519b2

Please sign in to comment.