Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inptus.elasticsearch): Gather enrich stats #15688

Merged
merged 2 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion plugins/inputs/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## To work this require local = true
cluster_stats_only_from_master = true

## Gather stats from the enrich API
# enrich_stats = false

## Indices to collect; can be one or more indices names or _all
## Use of wildcards is allowed. Use a wildcard at the end to retrieve index
## names that end with a changing value, like a date.
Expand All @@ -98,7 +101,7 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## If 'use_system_proxy' is set to true, Telegraf will check env vars such as
## HTTP_PROXY, HTTPS_PROXY, and NO_PROXY (or their lowercase counterparts).
## If 'use_system_proxy' is set to false (default) and 'http_proxy_url' is
Expand Down
65 changes: 65 additions & 0 deletions plugins/inputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,23 @@ type clusterHealth struct {
Indices map[string]indexHealth `json:"indices"`
}

type enrichStats struct {
CoordinatorStats []struct {
NodeID string `json:"node_id"`
QueueSize int `json:"queue_size"`
RemoteRequestsCurrent int `json:"remote_requests_current"`
RemoteRequestsTotal int `json:"remote_requests_total"`
ExecutedSearchesTotal int `json:"executed_searches_total"`
} `json:"coordinator_stats"`
CacheStats []struct {
NodeID string `json:"node_id"`
Count int `json:"count"`
Hits int64 `json:"hits"`
Misses int `json:"misses"`
Evictions int `json:"evictions"`
} `json:"cache_stats"`
}

type indexHealth struct {
ActivePrimaryShards int `json:"active_primary_shards"`
ActiveShards int `json:"active_shards"`
Expand Down Expand Up @@ -104,6 +121,7 @@ type Elasticsearch struct {
ClusterHealthLevel string `toml:"cluster_health_level"`
ClusterStats bool `toml:"cluster_stats"`
ClusterStatsOnlyFromMaster bool `toml:"cluster_stats_only_from_master"`
EnrichStats bool `toml:"enrich_stats"`
IndicesInclude []string `toml:"indices_include"`
IndicesLevel string `toml:"indices_level"`
NodeStats []string `toml:"node_stats"`
Expand Down Expand Up @@ -280,6 +298,13 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
}
}
}

if e.EnrichStats {
if err := e.gatherEnrichStats(s+"/_enrich/_stats", acc); err != nil {
acc.AddError(errors.New(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
return
}
}
}(serv, acc)
}

Expand Down Expand Up @@ -440,6 +465,46 @@ func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator
return nil
}

func (e *Elasticsearch) gatherEnrichStats(url string, acc telegraf.Accumulator) error {
enrichStats := &enrichStats{}
if err := e.gatherJSONData(url, enrichStats); err != nil {
return err
}
measurementTime := time.Now()

for _, coordinator := range enrichStats.CoordinatorStats {
coordinatorFields := map[string]interface{}{
"queue_size": coordinator.QueueSize,
"remote_requests_current": coordinator.RemoteRequestsCurrent,
"remote_requests_total": coordinator.RemoteRequestsTotal,
"executed_searches_total": coordinator.ExecutedSearchesTotal,
}
acc.AddFields(
"elasticsearch_enrich_stats_coordinator",
coordinatorFields,
map[string]string{"node_id": coordinator.NodeID},
measurementTime,
)
}

for _, cache := range enrichStats.CacheStats {
cacheFields := map[string]interface{}{
"count": cache.Count,
"hits": cache.Hits,
"misses": cache.Misses,
"evictions": cache.Evictions,
}
acc.AddFields(
"elasticsearch_enrich_stats_cache",
cacheFields,
map[string]string{"node_id": cache.NodeID},
measurementTime,
)
}

return nil
}

func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator) error {
clusterStats := &clusterStats{}
if err := e.gatherJSONData(url, clusterStats); err != nil {
Expand Down
16 changes: 16 additions & 0 deletions plugins/inputs/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,22 @@ func TestGatherIndividualStats(t *testing.T) {
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_breakers", nodestatsBreakersExpected, tags)
}

func TestGatherEnrichStats(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}
es.EnrichStats = true
es.client.Transport = newTransportMock(enrichStatsResponse)
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = defaultServerInfo()

var acc testutil.Accumulator
require.NoError(t, acc.GatherError(es.Gather))
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")

metrics := acc.GetTelegrafMetrics()
require.Len(t, metrics, 8)
}

func TestGatherNodeStats(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}
Expand Down
5 changes: 4 additions & 1 deletion plugins/inputs/elasticsearch/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
## To work this require local = true
cluster_stats_only_from_master = true

## Gather stats from the enrich API
# enrich_stats = false

## Indices to collect; can be one or more indices names or _all
## Use of wildcards is allowed. Use a wildcard at the end to retrieve index
## names that end with a changing value, like a date.
Expand All @@ -57,7 +60,7 @@
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## If 'use_system_proxy' is set to true, Telegraf will check env vars such as
## HTTP_PROXY, HTTPS_PROXY, and NO_PROXY (or their lowercase counterparts).
## If 'use_system_proxy' is set to false (default) and 'http_proxy_url' is
Expand Down
66 changes: 66 additions & 0 deletions plugins/inputs/elasticsearch/testdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,72 @@ const clusterHealthResponseWithIndices = `
}
`

const enrichStatsResponse = `
{
"executing_policies": [],
"coordinator_stats": [
{
"node_id": "RWkDKDRu_aV1fISRA7PIkg",
"queue_size": 0,
"remote_requests_current": 0,
"remote_requests_total": 101636700,
"executed_searches_total": 102230925
},
{
"node_id": "2BOvel8nrXRjmSMAMBSUp3",
"queue_size": 0,
"remote_requests_current": 0,
"remote_requests_total": 242051423,
"executed_searches_total": 242752071
},
{
"node_id": "smkOUPQOK1pymt8MCoglZJ",
"queue_size": 0,
"remote_requests_current": 0,
"remote_requests_total": 248009084,
"executed_searches_total": 248735550
},
{
"node_id": "g5EUAaS-6-z5w27OtGQeTI",
"queue_size": 0,
"remote_requests_current": 0,
"remote_requests_total": 233693129,
"executed_searches_total": 234476004
}
],
"cache_stats": [
{
"node_id": "RWkDKDRu_aV1fISRA7PIkg",
"count": 2500,
"hits": 6044497858,
"misses": 102230925,
"evictions": 92663663
},
{
"node_id": "2BOvel8nrXRjmSMAMBSUp3",
"count": 2500,
"hits": 14640821136,
"misses": 242752071,
"evictions": 226826313
},
{
"node_id": "smkOUPQOK1pymt8MCoglZJ",
"count": 2500,
"hits": 14145580115,
"misses": 248735550,
"evictions": 233860968
},
{
"node_id": "g5EUAaS-6-z5w27OtGQeTI",
"count": 2500,
"hits": 11016000946,
"misses": 234476004,
"evictions": 217698127
}
]
}
`

var clusterHealthExpected = map[string]interface{}{
"status": "green",
"status_code": 1,
Expand Down
Loading