Skip to content

Commit

Permalink
feat[elastic output]: add elastic pipeline flags (#10505)
Browse files Browse the repository at this point in the history
  • Loading branch information
zpriddy authored Feb 17, 2022
1 parent 3c600cd commit a60027a
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 21 deletions.
12 changes: 12 additions & 0 deletions plugins/outputs/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,16 @@ POST https://es.us-east-1.amazonaws.com/2021-01-01/opensearch/upgradeDomain
## NaNs and inf will be replaced with the given number, -inf with the negative of that number
# float_handling = "none"
# float_replacement_value = 0.0

## Pipeline Config
## To use a ingest pipeline, set this to the name of the pipeline you want to use.
# use_pipeline = "my_pipeline"
## Additionally, you can specify a tag name using the notation {{tag_name}}
## which will be used as part of the pipeline name. If the tag does not exist,
## the default pipeline will be used as the pipeline. If no default pipeline is set,
## no pipeline is used for the metric.
# use_pipeline = "{{es_pipeline}}"
# default_pipeline = "my_pipeline"
```

### Permissions
Expand Down Expand Up @@ -286,6 +296,8 @@ Additionally, you can specify dynamic index names by using tags with the notatio
* `force_document_id`: Set to true will compute a unique hash from as sha256(concat(timestamp,measurement,series-hash)),enables resend or update data withoud ES duplicated documents.
* `float_handling`: Specifies how to handle `NaN` and infinite field values. `"none"` (default) will do nothing, `"drop"` will drop the field and `replace` will replace the field value by the number in `float_replacement_value`
* `float_replacement_value`: Value (defaulting to `0.0`) to replace `NaN`s and `inf`s if `float_handling` is set to `replace`. Negative `inf` will be replaced by the negative value in this number to respect the sign of the field's original value.
* `use_pipeline`: If set, the set value will be used as the pipeline to call when sending events to elasticsearch. Additionally, you can specify dynamic pipeline names by using tags with the notation ```{{tag_name}}```. If the tag does not exist in a particular metric, the `default_pipeline` will be used instead.
* `default_pipeline`: If dynamic pipeline names the tag does not exist in a particular metric, this value will be used instead.

## Known issues

Expand Down
81 changes: 60 additions & 21 deletions plugins/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,29 @@ import (
)

type Elasticsearch struct {
URLs []string `toml:"urls"`
IndexName string
DefaultTagValue string
TagKeys []string
Username string
Password string
AuthBearerToken string
EnableSniffer bool
Timeout config.Duration
HealthCheckInterval config.Duration
EnableGzip bool
ManageTemplate bool
TemplateName string
OverwriteTemplate bool
ForceDocumentID bool `toml:"force_document_id"`
MajorReleaseNumber int
AuthBearerToken string `toml:"auth_bearer_token"`
DefaultPipeline string `toml:"default_pipeline"`
DefaultTagValue string `toml:"default_tag_value"`
EnableGzip bool `toml:"enable_gzip"`
EnableSniffer bool `toml:"enable_sniffer"`
FloatHandling string `toml:"float_handling"`
FloatReplacement float64 `toml:"float_replacement_value"`
ForceDocumentID bool `toml:"force_document_id"`
HealthCheckInterval config.Duration `toml:"health_check_interval"`
IndexName string `toml:"index_name"`
ManageTemplate bool `toml:"manage_template"`
OverwriteTemplate bool `toml:"overwrite_template"`
Password string `toml:"password"`
TemplateName string `toml:"template_name"`
Timeout config.Duration `toml:"timeout"`
URLs []string `toml:"urls"`
UsePipeline string `toml:"use_pipeline"`
Username string `toml:"username"`
Log telegraf.Logger `toml:"-"`
majorReleaseNumber int
pipelineName string
pipelineTagKeys []string
tagKeys []string
tls.ClientConfig

Client *elastic.Client
Expand Down Expand Up @@ -112,6 +116,16 @@ var sampleConfig = `
## NaNs and inf will be replaced with the given number, -inf with the negative of that number
# float_handling = "none"
# float_replacement_value = 0.0
## Pipeline Config
## To use a ingest pipeline, set this to the name of the pipeline you want to use.
# use_pipeline = "my_pipeline"
## Additionally, you can specify a tag name using the notation {{tag_name}}
## which will be used as part of the pipeline name. If the tag does not exist,
## the default pipeline will be used as the pipeline. If no default pipeline is set,
## no pipeline is used for the metric.
# use_pipeline = "{{es_pipeline}}"
# default_pipeline = "my_pipeline"
`

const telegrafTemplate = `
Expand Down Expand Up @@ -278,7 +292,7 @@ func (a *Elasticsearch) Connect() error {
a.Log.Infof("Elasticsearch version: %q", esVersion)

a.Client = client
a.MajorReleaseNumber = majorReleaseNumber
a.majorReleaseNumber = majorReleaseNumber

if a.ManageTemplate {
err := a.manageTemplate(ctx)
Expand All @@ -287,7 +301,8 @@ func (a *Elasticsearch) Connect() error {
}
}

a.IndexName, a.TagKeys = a.GetTagKeys(a.IndexName)
a.IndexName, a.tagKeys = a.GetTagKeys(a.IndexName)
a.pipelineName, a.pipelineTagKeys = a.GetTagKeys(a.UsePipeline)

return nil
}
Expand Down Expand Up @@ -316,7 +331,7 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {

// index name has to be re-evaluated each time for telegraf
// to send the metric to the correct time-based index
indexName := a.GetIndexName(a.IndexName, metric.Time(), a.TagKeys, metric.Tags())
indexName := a.GetIndexName(a.IndexName, metric.Time(), a.tagKeys, metric.Tags())

// Handle NaN and inf field-values
fields := make(map[string]interface{})
Expand Down Expand Up @@ -351,10 +366,16 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
br.Id(id)
}

if a.MajorReleaseNumber <= 6 {
if a.majorReleaseNumber <= 6 {
br.Type("metrics")
}

if a.UsePipeline != "" {
if pipelineName := a.getPipelineName(a.pipelineName, a.pipelineTagKeys, metric.Tags()); pipelineName != "" {
br.Pipeline(pipelineName)
}
}

bulkRequest.Add(br)
}

Expand Down Expand Up @@ -406,7 +427,7 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
if (a.OverwriteTemplate) || (!templateExists) || (templatePattern != "") {
tp := templatePart{
TemplatePattern: templatePattern + "*",
Version: a.MajorReleaseNumber,
Version: a.majorReleaseNumber,
}

t := template.Must(template.New("template").Parse(telegrafTemplate))
Expand Down Expand Up @@ -482,6 +503,24 @@ func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time, tagK
return fmt.Sprintf(indexName, tagValues...)
}

func (a *Elasticsearch) getPipelineName(pipelineInput string, tagKeys []string, metricTags map[string]string) string {
if !strings.Contains(pipelineInput, "%") || len(tagKeys) == 0 {
return pipelineInput
}

var tagValues []interface{}

for _, key := range tagKeys {
if value, ok := metricTags[key]; ok {
tagValues = append(tagValues, value)
continue
}
a.Log.Debugf("Tag %s not found, reverting to default pipeline instead.", key)
return a.DefaultPipeline
}
return fmt.Sprintf(pipelineInput, tagValues...)
}

func getISOWeek(eventTime time.Time) string {
_, week := eventTime.ISOWeek()
return strconv.Itoa(week)
Expand Down
143 changes: 143 additions & 0 deletions plugins/outputs/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,149 @@ func TestGetIndexName(t *testing.T) {
}
}

func TestGetPipelineName(t *testing.T) {
e := &Elasticsearch{
UsePipeline: "{{es-pipeline}}",
DefaultPipeline: "myDefaultPipeline",
Log: testutil.Logger{},
}
e.pipelineName, e.pipelineTagKeys = e.GetTagKeys(e.UsePipeline)

tests := []struct {
EventTime time.Time
Tags map[string]string
PipelineTagKeys []string
Expected string
}{
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"myDefaultPipeline",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"myDefaultPipeline",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "es-pipeline": "myOtherPipeline"},
[]string{},
"myOtherPipeline",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"},
[]string{},
"pipeline2",
},
}
for _, test := range tests {
pipelineName := e.getPipelineName(e.pipelineName, e.pipelineTagKeys, test.Tags)
require.Equal(t, test.Expected, pipelineName)
}

// Setup testing for testing no pipeline set. All the tests in this case should return "".
e = &Elasticsearch{
Log: testutil.Logger{},
}
e.pipelineName, e.pipelineTagKeys = e.GetTagKeys(e.UsePipeline)

for _, test := range tests {
pipelineName := e.getPipelineName(e.pipelineName, e.pipelineTagKeys, test.Tags)
require.Equal(t, "", pipelineName)
}
}

func TestPipelineConfigs(t *testing.T) {
tests := []struct {
EventTime time.Time
Tags map[string]string
PipelineTagKeys []string
Expected string
Elastic *Elasticsearch
}{
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"",
&Elasticsearch{
Log: testutil.Logger{},
},
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"",
&Elasticsearch{
DefaultPipeline: "myDefaultPipeline",
Log: testutil.Logger{},
},
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "es-pipeline": "myOtherPipeline"},
[]string{},
"myDefaultPipeline",
&Elasticsearch{
UsePipeline: "myDefaultPipeline",
Log: testutil.Logger{},
},
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"},
[]string{},
"",
&Elasticsearch{
DefaultPipeline: "myDefaultPipeline",
Log: testutil.Logger{},
},
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"},
[]string{},
"pipeline2",
&Elasticsearch{
UsePipeline: "{{es-pipeline}}",
Log: testutil.Logger{},
},
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"},
[]string{},
"value1-pipeline2",
&Elasticsearch{
UsePipeline: "{{tag1}}-{{es-pipeline}}",
Log: testutil.Logger{},
},
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1"},
[]string{},
"",
&Elasticsearch{
UsePipeline: "{{es-pipeline}}",
Log: testutil.Logger{},
},
},
}

for _, test := range tests {
e := test.Elastic
e.pipelineName, e.pipelineTagKeys = e.GetTagKeys(e.UsePipeline)
pipelineName := e.getPipelineName(e.pipelineName, e.pipelineTagKeys, test.Tags)
require.Equal(t, test.Expected, pipelineName)
}
}

func TestRequestHeaderWhenGzipIsEnabled(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
Expand Down

0 comments on commit a60027a

Please sign in to comment.