Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add nomad input plugin #10106

Merged
merged 14 commits into from
Dec 8, 2021
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/nginx_sts"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx_upstream_check"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx_vts"
_ "github.com/influxdata/telegraf/plugins/inputs/nomad"
_ "github.com/influxdata/telegraf/plugins/inputs/nsd"
_ "github.com/influxdata/telegraf/plugins/inputs/nsq"
_ "github.com/influxdata/telegraf/plugins/inputs/nsq_consumer"
Expand Down
39 changes: 39 additions & 0 deletions plugins/inputs/nomad/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Nomad Input Plugin

The Nomad plugin must grab metrics from every Nomad agent of the cluster. Telegraf may be present in every node and connect to the agent locally. In this case should be something like `http://127.0.0.1:4646`.

> Tested on Nomad 1.1.6

### Configuration

```toml
[[inputs.nomad]]
## URL for the Nomad agent
url = "http://127.0.0.1:4646"
srebhan marked this conversation as resolved.
Show resolved Hide resolved
srebhan marked this conversation as resolved.
Show resolved Hide resolved

## Use auth token for authorization. ('auth_token' takes priority)
## If both of these are empty, no token will be used.
# auth_token = "/path/to/auth/token"
## OR
# auth_token_string = "a1234567-40c7-9048-7bae-378687048181"

## Labels to be added as tags. An empty array for both include and
## exclude will include all labels.
# label_include = []
# label_exclude = ["*"]

## Set response_timeout (default 5 seconds)
# response_timeout = "5s"

## Optional TLS Config
# tls_ca = /path/to/cafile
# tls_cert = /path/to/certfile
# tls_key = /path/to/keyfile
```


### Metrics

Both Nomad servers and agents collect various metrics, for every details please have a look at Nomad documentation, here:
- [https://www.nomadproject.io/docs/operations/metrics](https://www.nomadproject.io/docs/operations/metrics)
- [https://www.nomadproject.io/docs/operations/telemetry](https://www.nomadproject.io/docs/operations/telemetry)
215 changes: 215 additions & 0 deletions plugins/inputs/nomad/nomad.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package nomad

import (
"encoding/json"
"fmt"
"net/http"
"os"
"strings"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)

type Nomad struct {
URL string
srebhan marked this conversation as resolved.
Show resolved Hide resolved

AuthToken string `toml:"auth_token"`
AuthTokenString string `toml:"auth_token_string"`

LabelInclude []string `toml:"label_include"`
LabelExclude []string `toml:"label_exclude"`

labelFilter filter.Filter

ResponseTimeout config.Duration
srebhan marked this conversation as resolved.
Show resolved Hide resolved

tls.ClientConfig

RoundTripper http.RoundTripper
srebhan marked this conversation as resolved.
Show resolved Hide resolved
}

const timeLayout = "2006-01-02 15:04:05 -0700 MST"

var sampleConfig = `
## URL for the Nomad agent
url = "http://127.0.0.1:4646"
srebhan marked this conversation as resolved.
Show resolved Hide resolved

## Use auth token for authorization. ('auth_token' takes priority)
## If both of these are empty, no token will be used.
# auth_token = "/path/to/auth/token"
## OR
# auth_token_string = "a1234567-40c7-9048-7bae-378687048181"

## Labels to be added as tags. An empty array for both include and
## exclude will include all labels.
# label_include = []
# label_exclude = ["*"]

## Set response_timeout (default 5 seconds)
# response_timeout = "5s"

## Optional TLS Config
# tls_ca = /path/to/cafile
# tls_cert = /path/to/certfile
# tls_key = /path/to/keyfile
`

func init() {
inputs.Add("nomad", func() telegraf.Input {
return &Nomad{
LabelInclude: []string{},
LabelExclude: []string{"*"},
srebhan marked this conversation as resolved.
Show resolved Hide resolved
}
})
}

func (n *Nomad) SampleConfig() string {
return sampleConfig
}

func (n *Nomad) Description() string {
return "Read metrics from the Nomad api"
srebhan marked this conversation as resolved.
Show resolved Hide resolved
}

func (n *Nomad) Init() error {
if n.AuthToken == "" && n.AuthTokenString == "" {
n.AuthToken = ""
}

srebhan marked this conversation as resolved.
Show resolved Hide resolved
if n.AuthToken != "" {
token, err := os.ReadFile(n.AuthToken)
if err != nil {
return err
}
n.AuthTokenString = strings.TrimSpace(string(token))
}
srebhan marked this conversation as resolved.
Show resolved Hide resolved

labelFilter, err := filter.NewIncludeExcludeFilter(n.LabelInclude, n.LabelExclude)
if err != nil {
return err
srebhan marked this conversation as resolved.
Show resolved Hide resolved
srebhan marked this conversation as resolved.
Show resolved Hide resolved
}
n.labelFilter = labelFilter

return nil
}

func (n *Nomad) Gather(acc telegraf.Accumulator) error {
acc.AddError(n.gatherSummary(n.URL, acc))
return nil
}
srebhan marked this conversation as resolved.
Show resolved Hide resolved

func (n *Nomad) gatherSummary(baseURL string, acc telegraf.Accumulator) error {
srebhan marked this conversation as resolved.
Show resolved Hide resolved
summaryMetrics := &MetricsSummary{}
err := n.LoadJSON(fmt.Sprintf("%s/v1/metrics", baseURL), summaryMetrics)
if err != nil {
return err
}

buildNomadMetrics(summaryMetrics, acc)

return nil
}

func (n *Nomad) LoadJSON(url string, v interface{}) error {
srebhan marked this conversation as resolved.
Show resolved Hide resolved
var req, err = http.NewRequest("GET", url, nil)
srebhan marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
var resp *http.Response
tlsCfg, err := n.ClientConfig.TLSConfig()
if err != nil {
return err
}
if n.RoundTripper == nil {
if n.ResponseTimeout < config.Duration(time.Second) {
n.ResponseTimeout = config.Duration(time.Second * 5)
}
n.RoundTripper = &http.Transport{
TLSHandshakeTimeout: 5 * time.Second,
TLSClientConfig: tlsCfg,
ResponseHeaderTimeout: time.Duration(n.ResponseTimeout),
}
}
req.Header.Set("Authorization", "X-Nomad-Token "+n.AuthTokenString)
req.Header.Add("Accept", "application/json")
srebhan marked this conversation as resolved.
Show resolved Hide resolved
resp, err = n.RoundTripper.RoundTrip(req)
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", url, resp.Status)
}

err = json.NewDecoder(resp.Body).Decode(v)
if err != nil {
return fmt.Errorf(`error parsing response: %s`, err)
srebhan marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
}

func buildNomadMetrics(summaryMetrics *MetricsSummary, acc telegraf.Accumulator) {

srebhan marked this conversation as resolved.
Show resolved Hide resolved
t, err := time.Parse(timeLayout, summaryMetrics.Timestamp)
if err != nil {
panic(err)
}
srebhan marked this conversation as resolved.
Show resolved Hide resolved
sampledValueFields := make(map[string]interface{})

for c, counters := range summaryMetrics.Counters {
tags := summaryMetrics.Counters[c].DisplayLabels
srebhan marked this conversation as resolved.
Show resolved Hide resolved

sampledValueFields["count"] = counters.Count
sampledValueFields["rate"] = counters.Rate
sampledValueFields["sum"] = counters.Sum
sampledValueFields["sumsq"] = counters.SumSq
sampledValueFields["min"] = counters.Min
sampledValueFields["max"] = counters.Max
sampledValueFields["mean"] = counters.Mean

acc.AddCounter(counters.Name, sampledValueFields, tags, t)
srebhan marked this conversation as resolved.
Show resolved Hide resolved
}

for c, gauges := range summaryMetrics.Gauges {
tags := summaryMetrics.Gauges[c].DisplayLabels
srebhan marked this conversation as resolved.
Show resolved Hide resolved

fields := make(map[string]interface{})
fields["value"] = gauges.Value

t, err := time.Parse(timeLayout, summaryMetrics.Timestamp)
if err != nil {
panic(err)
}
acc.AddGauge(gauges.Name, fields, tags, t)
}

for _, points := range summaryMetrics.Points {
tags := make(map[string]string)

fields := make(map[string]interface{})
fields["value"] = points.Points
srebhan marked this conversation as resolved.
Show resolved Hide resolved

acc.AddFields(points.Name, fields, tags, t)
}

for c, samples := range summaryMetrics.Samples {
tags := summaryMetrics.Samples[c].DisplayLabels
srebhan marked this conversation as resolved.
Show resolved Hide resolved

sampledValueFields := make(map[string]interface{})
sampledValueFields["count"] = samples.Count
sampledValueFields["rate"] = samples.Rate
sampledValueFields["sum"] = samples.Sum
sampledValueFields["sumsq"] = samples.SumSq
sampledValueFields["min"] = samples.Min
sampledValueFields["max"] = samples.Max
sampledValueFields["mean"] = samples.Mean

acc.AddCounter(samples.Name, sampledValueFields, tags, t)
}
}
53 changes: 53 additions & 0 deletions plugins/inputs/nomad/nomad_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package nomad

import (
"time"
)

type MetricsSummary struct {
Timestamp string
Gauges []GaugeValue
Points []PointValue
Counters []SampledValue
Samples []SampledValue
}

type GaugeValue struct {
Name string
Hash string `json:"-"`
Value float32

Labels []Label `json:"-"`
DisplayLabels map[string]string `json:"Labels"`
}

type PointValue struct {
Name string
Points []float32
}

type SampledValue struct {
Name string
Hash string `json:"-"`
*AggregateSample
Mean float64
Stddev float64

Labels []Label `json:"-"`
DisplayLabels map[string]string `json:"Labels"`
}

type AggregateSample struct {
Count int
Rate float64
Sum float64
SumSq float64 `json:"-"`
Min float64
Max float64
LastUpdated time.Time `json:"-"`
}

type Label struct {
Name string
Value string
}
Loading