Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat[elastic output]: add elastic pipeline flags #10505

Merged
merged 14 commits into from
Feb 17, 2022
1 change: 1 addition & 0 deletions docs/LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ following works:
- github.com/nats-io/nuid [Apache License 2.0](https://github.com/nats-io/nuid/blob/master/LICENSE)
- github.com/newrelic/newrelic-telemetry-sdk-go [Apache License 2.0](https://github.com/newrelic/newrelic-telemetry-sdk-go/blob/master/LICENSE.md)
- github.com/nsqio/go-nsq [MIT License](https://github.com/nsqio/go-nsq/blob/master/LICENSE)
- github.com/olivere/elastic [MIT License](https://github.com/olivere/elastic/blob/release-branch.v7/LICENSE)
- github.com/openconfig/gnmi [Apache License 2.0](https://github.com/openconfig/gnmi/blob/master/LICENSE)
- github.com/opencontainers/go-digest [Apache License 2.0](https://github.com/opencontainers/go-digest/blob/master/LICENSE)
- github.com/opencontainers/image-spec [Apache License 2.0](https://github.com/opencontainers/image-spec/blob/master/LICENSE)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ require (
github.com/nats-io/nats.go v1.13.1-0.20211018182449-f2416a8b1483
github.com/newrelic/newrelic-telemetry-sdk-go v0.5.1
github.com/nsqio/go-nsq v1.1.0
github.com/olivere/elastic v6.2.37+incompatible
powersj marked this conversation as resolved.
Show resolved Hide resolved
github.com/openconfig/gnmi v0.0.0-20180912164834-33a1865c3029
github.com/opentracing/opentracing-go v1.2.0
github.com/openzipkin-contrib/zipkin-go-opentracing v0.4.5
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1665,6 +1665,7 @@ github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA=
github.com/olivere/elastic v6.2.35+incompatible/go.mod h1:J+q1zQJTgAz9woqsbVRqGeB5G1iqDKVBWLNSYW8yfJ8=
github.com/olivere/elastic v6.2.37+incompatible h1:UfSGJem5czY+x/LqxgeCBgjDn6St+z8OnsCuxwD3L0U=
github.com/olivere/elastic v6.2.37+incompatible/go.mod h1:J+q1zQJTgAz9woqsbVRqGeB5G1iqDKVBWLNSYW8yfJ8=
github.com/olivere/elastic/v7 v7.0.12/go.mod h1:14rWX28Pnh3qCKYRVnSGXWLf9MbLonYS/4FDCY3LAPo=
github.com/onsi/ginkgo v0.0.0-20151202141238-7f8ab55aaf3b/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down
12 changes: 12 additions & 0 deletions plugins/outputs/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,16 @@ This plugin will format the events in the following way:
## NaNs and inf will be replaced with the given number, -inf with the negative of that number
# float_handling = "none"
# float_replacement_value = 0.0

## Pipeline Config
## To use a ingest pipeline, set this to the name of the pipeline you want to use.
# use_pipeline = "my_pipeline"
## Additionally, you can specify a tag name using the notation {{tag_name}}
## which will be used as part of the pipeline name. If the tag does not exist,
## the default pipeline will be used as the pipeline. If no default pipeline is set,
## no pipeline is used for the metric.
# use_pipeline = "{{es_pipeline}}"
Comment on lines +215 to +221
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually only mention a variable once in the sample config. Could you merge the two sections describing "use_pipeline" into one and remove one of the commented settings? It's just a project readme style that makes all the various plugins look more uniform.

Also maybe move the description of default_pipeline to be right before that setting?
Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to follow the same format as the index name section. But this can be merged, i just wanted to make it clear between using a static value and a dynamic value

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reimda are you good with this as-is?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with bending the "one mention" rule to make it more clear, especially since index_name does it already in this plugin.

# default_pipeline = "my_pipeline"
```

### Permissions
Expand Down Expand Up @@ -249,6 +259,8 @@ Additionally, you can specify dynamic index names by using tags with the notatio
* `force_document_id`: Set to true will compute a unique hash from as sha256(concat(timestamp,measurement,series-hash)),enables resend or update data withoud ES duplicated documents.
* `float_handling`: Specifies how to handle `NaN` and infinite field values. `"none"` (default) will do nothing, `"drop"` will drop the field and `replace` will replace the field value by the number in `float_replacement_value`
* `float_replacement_value`: Value (defaulting to `0.0`) to replace `NaN`s and `inf`s if `float_handling` is set to `replace`. Negative `inf` will be replaced by the negative value in this number to respect the sign of the field's original value.
* `use_pipeline`: If set, the set value will be used as the pipeline to call when sending events to elasticsearch. Additionally, you can specify dynamic pipeline names by using tags with the notation ```{{tag_name}}```. If the tag does not exist in a particular metric, the `default_pipeline` will be used instead.
* `default_pipeline`: If dynamic pipeline names the tag does not exist in a particular metric, this value will be used instead.

## Known issues

Expand Down
85 changes: 62 additions & 23 deletions plugins/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

"crypto/sha256"

"gopkg.in/olivere/elastic.v5"
"github.com/olivere/elastic"

powersj marked this conversation as resolved.
Show resolved Hide resolved
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
Expand All @@ -23,23 +23,27 @@ import (

type Elasticsearch struct {
URLs []string `toml:"urls"`
IndexName string
DefaultTagValue string
TagKeys []string
Username string
Password string
AuthBearerToken string
EnableSniffer bool
Timeout config.Duration
HealthCheckInterval config.Duration
EnableGzip bool
ManageTemplate bool
TemplateName string
OverwriteTemplate bool
ForceDocumentID bool `toml:"force_document_id"`
MajorReleaseNumber int
FloatHandling string `toml:"float_handling"`
FloatReplacement float64 `toml:"float_replacement_value"`
IndexName string `toml:"index_name"`
DefaultTagValue string `toml:"default_tag_value"`
tagKeys []string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you maybe group the "user-option" fields and the internal fields here? I.e. move tagKeys and majorReleaseNumber at the end of the struct? That's what most other plugins look like...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't decide before if they should be done that way, or alphabetically, or... So I ended up leaving it how it was.. Now they are done alphabetically grouped by User Config, locals, then core structs...

Username string `toml:"username"`
Password string `toml:"password"`
AuthBearerToken string `toml:"auth_bearer_token"`
EnableSniffer bool `toml:"enable_sniffer"`
Timeout config.Duration `toml:"timeout"`
HealthCheckInterval config.Duration `toml:"health_check_interval"`
EnableGzip bool `toml:"enable_gzip"`
ManageTemplate bool `toml:"manage_template"`
TemplateName string `toml:"template_name"`
OverwriteTemplate bool `toml:"overwrite_template"`
ForceDocumentID bool `toml:"force_document_id"`
majorReleaseNumber int
FloatHandling string `toml:"float_handling"`
FloatReplacement float64 `toml:"float_replacement_value"`
UsePipeline string `toml:"use_pipeline"`
DefaultPipeline string `toml:"default_pipeline"`
pipelineTagKeys []string
pipelineName string
Log telegraf.Logger `toml:"-"`
tls.ClientConfig

Expand Down Expand Up @@ -111,6 +115,16 @@ var sampleConfig = `
## NaNs and inf will be replaced with the given number, -inf with the negative of that number
# float_handling = "none"
# float_replacement_value = 0.0

## Pipeline Config
## To use a ingest pipeline, set this to the name of the pipeline you want to use.
# use_pipeline = "my_pipeline"
## Additionally, you can specify a tag name using the notation {{tag_name}}
## which will be used as part of the pipeline name. If the tag does not exist,
## the default pipeline will be used as the pipeline. If no default pipeline is set,
## no pipeline is used for the metric.
# use_pipeline = "{{es_pipeline}}"
# default_pipeline = "my_pipeline"
`

const telegrafTemplate = `
Expand Down Expand Up @@ -271,7 +285,7 @@ func (a *Elasticsearch) Connect() error {
a.Log.Infof("Elasticsearch version: %q", esVersion)

a.Client = client
a.MajorReleaseNumber = majorReleaseNumber
a.majorReleaseNumber = majorReleaseNumber

if a.ManageTemplate {
err := a.manageTemplate(ctx)
Expand All @@ -280,7 +294,8 @@ func (a *Elasticsearch) Connect() error {
}
}

a.IndexName, a.TagKeys = a.GetTagKeys(a.IndexName)
a.IndexName, a.tagKeys = a.GetTagKeys(a.IndexName)
a.pipelineName, a.pipelineTagKeys = a.GetTagKeys(a.UsePipeline)

return nil
}
Expand Down Expand Up @@ -309,7 +324,7 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {

// index name has to be re-evaluated each time for telegraf
// to send the metric to the correct time-based index
indexName := a.GetIndexName(a.IndexName, metric.Time(), a.TagKeys, metric.Tags())
indexName := a.GetIndexName(a.IndexName, metric.Time(), a.tagKeys, metric.Tags())

// Handle NaN and inf field-values
fields := make(map[string]interface{})
Expand Down Expand Up @@ -344,10 +359,16 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
br.Id(id)
}

if a.MajorReleaseNumber <= 6 {
if a.majorReleaseNumber <= 6 {
br.Type("metrics")
}

if a.UsePipeline != "" {
if pipelineName := a.getPipelineName(a.pipelineName, a.pipelineTagKeys, metric.Tags()); pipelineName != "" {
br.Pipeline(pipelineName)
}
}

bulkRequest.Add(br)
}

Expand Down Expand Up @@ -399,7 +420,7 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
if (a.OverwriteTemplate) || (!templateExists) || (templatePattern != "") {
tp := templatePart{
TemplatePattern: templatePattern + "*",
Version: a.MajorReleaseNumber,
Version: a.majorReleaseNumber,
}

t := template.Must(template.New("template").Parse(telegrafTemplate))
Expand Down Expand Up @@ -475,6 +496,24 @@ func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time, tagK
return fmt.Sprintf(indexName, tagValues...)
}

func (a *Elasticsearch) getPipelineName(pipelineInput string, tagKeys []string, metricTags map[string]string) string {
if !strings.Contains(pipelineInput, "%") || len(tagKeys) == 0 {
return pipelineInput
}

var tagValues []interface{}

for _, key := range tagKeys {
if value, ok := metricTags[key]; ok {
tagValues = append(tagValues, value)
continue
}
a.Log.Debugf("Tag %s not found, reverting to default pipeline instead.", key)
return a.DefaultPipeline
}
return fmt.Sprintf(pipelineInput, tagValues...)
}

func getISOWeek(eventTime time.Time) string {
_, week := eventTime.ISOWeek()
return strconv.Itoa(week)
Expand Down
143 changes: 143 additions & 0 deletions plugins/outputs/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,149 @@ func TestGetIndexName(t *testing.T) {
}
}

func TestGetPipelineName(t *testing.T) {
e := &Elasticsearch{
UsePipeline: "{{es-pipeline}}",
DefaultPipeline: "myDefaultPipeline",
Log: testutil.Logger{},
}
e.pipelineName, e.pipelineTagKeys = e.GetTagKeys(e.UsePipeline)

tests := []struct {
EventTime time.Time
Tags map[string]string
PipelineTagKeys []string
Expected string
}{
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"myDefaultPipeline",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"myDefaultPipeline",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "es-pipeline": "myOtherPipeline"},
[]string{},
"myOtherPipeline",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"},
[]string{},
"pipeline2",
},
}
for _, test := range tests {
pipelineName := e.getPipelineName(e.pipelineName, e.pipelineTagKeys, test.Tags)
require.Equal(t, test.Expected, pipelineName)
}

// Setup testing for testing no pipeline set. All the tests in this case should return "".
e = &Elasticsearch{
Log: testutil.Logger{},
}
e.pipelineName, e.pipelineTagKeys = e.GetTagKeys(e.UsePipeline)

for _, test := range tests {
pipelineName := e.getPipelineName(e.pipelineName, e.pipelineTagKeys, test.Tags)
require.Equal(t, "", pipelineName)
}
}

func TestPipelineConfigs(t *testing.T) {
tests := []struct {
EventTime time.Time
Tags map[string]string
PipelineTagKeys []string
Expected string
Elastic *Elasticsearch
}{
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"",
&Elasticsearch{
Log: testutil.Logger{},
},
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"",
&Elasticsearch{
DefaultPipeline: "myDefaultPipeline",
Log: testutil.Logger{},
},
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "es-pipeline": "myOtherPipeline"},
[]string{},
"myDefaultPipeline",
&Elasticsearch{
UsePipeline: "myDefaultPipeline",
Log: testutil.Logger{},
},
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"},
[]string{},
"",
&Elasticsearch{
DefaultPipeline: "myDefaultPipeline",
Log: testutil.Logger{},
},
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"},
[]string{},
"pipeline2",
&Elasticsearch{
UsePipeline: "{{es-pipeline}}",
Log: testutil.Logger{},
},
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"},
[]string{},
"value1-pipeline2",
&Elasticsearch{
UsePipeline: "{{tag1}}-{{es-pipeline}}",
Log: testutil.Logger{},
},
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1"},
[]string{},
"",
&Elasticsearch{
UsePipeline: "{{es-pipeline}}",
Log: testutil.Logger{},
},
},
}

for _, test := range tests {
e := test.Elastic
e.pipelineName, e.pipelineTagKeys = e.GetTagKeys(e.UsePipeline)
pipelineName := e.getPipelineName(e.pipelineName, e.pipelineTagKeys, test.Tags)
require.Equal(t, test.Expected, pipelineName)
}
}

func TestRequestHeaderWhenGzipIsEnabled(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
Expand Down