-
Notifications
You must be signed in to change notification settings - Fork 5.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add nomad input plugin (#10106)
- Loading branch information
Showing
6 changed files
with
411 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
# Hashicorp Nomad Input Plugin | ||
|
||
The Nomad plugin must grab metrics from every Nomad agent of the cluster. Telegraf may be present in every node and connect to the agent locally. In this case should be something like `http://127.0.0.1:4646`. | ||
|
||
> Tested on Nomad 1.1.6 | ||
## Configuration | ||
|
||
```toml | ||
[[inputs.nomad]] | ||
## URL for the Nomad agent | ||
# url = "http://127.0.0.1:4646" | ||
|
||
## Set response_timeout (default 5 seconds) | ||
# response_timeout = "5s" | ||
|
||
## Optional TLS Config | ||
# tls_ca = /path/to/cafile | ||
# tls_cert = /path/to/certfile | ||
# tls_key = /path/to/keyfile | ||
``` | ||
|
||
## Metrics | ||
|
||
Both Nomad servers and agents collect various metrics. For every details, please have a look at Nomad following documentation: | ||
|
||
- [https://www.nomadproject.io/docs/operations/metrics](https://www.nomadproject.io/docs/operations/metrics) | ||
- [https://www.nomadproject.io/docs/operations/telemetry](https://www.nomadproject.io/docs/operations/telemetry) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
package nomad | ||
|
||
import ( | ||
"encoding/json" | ||
"fmt" | ||
"net/http" | ||
"time" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/config" | ||
"github.com/influxdata/telegraf/plugins/common/tls" | ||
"github.com/influxdata/telegraf/plugins/inputs" | ||
) | ||
|
||
// Nomad configuration object | ||
type Nomad struct { | ||
URL string `toml:"url"` | ||
|
||
ResponseTimeout config.Duration `toml:"response_timeout"` | ||
|
||
tls.ClientConfig | ||
|
||
roundTripper http.RoundTripper | ||
} | ||
|
||
const timeLayout = "2006-01-02 15:04:05 -0700 MST" | ||
|
||
var sampleConfig = ` | ||
## URL for the Nomad agent | ||
# url = "http://127.0.0.1:4646" | ||
## Set response_timeout (default 5 seconds) | ||
# response_timeout = "5s" | ||
## Optional TLS Config | ||
# tls_ca = /path/to/cafile | ||
# tls_cert = /path/to/certfile | ||
# tls_key = /path/to/keyfile | ||
` | ||
|
||
func init() { | ||
inputs.Add("nomad", func() telegraf.Input { | ||
return &Nomad{ | ||
ResponseTimeout: config.Duration(5 * time.Second), | ||
} | ||
}) | ||
} | ||
|
||
// SampleConfig returns a sample config | ||
func (n *Nomad) SampleConfig() string { | ||
return sampleConfig | ||
} | ||
|
||
// Description returns a description of the plugin | ||
func (n *Nomad) Description() string { | ||
return "Read metrics from the Nomad API" | ||
} | ||
|
||
func (n *Nomad) Init() error { | ||
if n.URL == "" { | ||
n.URL = "http://127.0.0.1:4646" | ||
} | ||
|
||
tlsCfg, err := n.ClientConfig.TLSConfig() | ||
if err != nil { | ||
return fmt.Errorf("setting up TLS configuration failed: %v", err) | ||
} | ||
|
||
n.roundTripper = &http.Transport{ | ||
TLSHandshakeTimeout: 5 * time.Second, | ||
TLSClientConfig: tlsCfg, | ||
ResponseHeaderTimeout: time.Duration(n.ResponseTimeout), | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Gather, collects metrics from Nomad endpoint | ||
func (n *Nomad) Gather(acc telegraf.Accumulator) error { | ||
summaryMetrics := &MetricsSummary{} | ||
err := n.loadJSON(n.URL+"/v1/metrics", summaryMetrics) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err = buildNomadMetrics(acc, summaryMetrics) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (n *Nomad) loadJSON(url string, v interface{}) error { | ||
req, err := http.NewRequest("GET", url, nil) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
resp, err := n.roundTripper.RoundTrip(req) | ||
if err != nil { | ||
return fmt.Errorf("error making HTTP request to %s: %s", url, err) | ||
} | ||
defer resp.Body.Close() | ||
|
||
if resp.StatusCode != http.StatusOK { | ||
return fmt.Errorf("%s returned HTTP status %s", url, resp.Status) | ||
} | ||
|
||
err = json.NewDecoder(resp.Body).Decode(v) | ||
if err != nil { | ||
return fmt.Errorf("error parsing json response: %s", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// buildNomadMetrics, it builds all the metrics and adds them to the accumulator) | ||
func buildNomadMetrics(acc telegraf.Accumulator, summaryMetrics *MetricsSummary) error { | ||
t, err := time.Parse(timeLayout, summaryMetrics.Timestamp) | ||
if err != nil { | ||
return fmt.Errorf("error parsing time: %s", err) | ||
} | ||
|
||
for _, counters := range summaryMetrics.Counters { | ||
tags := counters.DisplayLabels | ||
|
||
fields := map[string]interface{}{ | ||
"count": counters.Count, | ||
"rate": counters.Rate, | ||
"sum": counters.Sum, | ||
"sumsq": counters.SumSq, | ||
"min": counters.Min, | ||
"max": counters.Max, | ||
"mean": counters.Mean, | ||
} | ||
acc.AddCounter(counters.Name, fields, tags, t) | ||
} | ||
|
||
for _, gauges := range summaryMetrics.Gauges { | ||
tags := gauges.DisplayLabels | ||
|
||
fields := map[string]interface{}{ | ||
"value": gauges.Value, | ||
} | ||
|
||
acc.AddGauge(gauges.Name, fields, tags, t) | ||
} | ||
|
||
for _, points := range summaryMetrics.Points { | ||
tags := make(map[string]string) | ||
|
||
fields := map[string]interface{}{ | ||
"value": points.Points, | ||
} | ||
|
||
acc.AddFields(points.Name, fields, tags, t) | ||
} | ||
|
||
for _, samples := range summaryMetrics.Samples { | ||
tags := samples.DisplayLabels | ||
|
||
fields := map[string]interface{}{ | ||
"count": samples.Count, | ||
"rate": samples.Rate, | ||
"sum": samples.Sum, | ||
"stddev": samples.Stddev, | ||
"sumsq": samples.SumSq, | ||
"min": samples.Min, | ||
"max": samples.Max, | ||
"mean": samples.Mean, | ||
} | ||
acc.AddCounter(samples.Name, fields, tags, t) | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
package nomad | ||
|
||
import ( | ||
"time" | ||
) | ||
|
||
type MetricsSummary struct { | ||
Timestamp string `json:"timestamp"` | ||
Gauges []GaugeValue `json:"gauges"` | ||
Points []PointValue `json:"points"` | ||
Counters []SampledValue `json:"counters"` | ||
Samples []SampledValue `json:"samples"` | ||
} | ||
|
||
type GaugeValue struct { | ||
Name string `json:"name"` | ||
Hash string `json:"-"` | ||
Value float32 `json:"value"` | ||
|
||
Labels []Label `json:"-"` | ||
DisplayLabels map[string]string `json:"Labels"` | ||
} | ||
|
||
type PointValue struct { | ||
Name string `json:"name"` | ||
Points []float32 `json:"points"` | ||
} | ||
|
||
type SampledValue struct { | ||
Name string `json:"name"` | ||
Hash string `json:"-"` | ||
*AggregateSample | ||
Mean float64 `json:"mean"` | ||
Stddev float64 `json:"stddev"` | ||
|
||
Labels []Label `json:"-"` | ||
DisplayLabels map[string]string `json:"Labels"` | ||
} | ||
|
||
type AggregateSample struct { | ||
Count int `json:"count"` | ||
Rate float64 `json:"rate"` | ||
Sum float64 `json:"sum"` | ||
SumSq float64 `json:"-"` | ||
Min float64 `json:"min"` | ||
Max float64 `json:"max"` | ||
LastUpdated time.Time `json:"-"` | ||
} | ||
|
||
type Label struct { | ||
Name string `json:"name"` | ||
Value string `json:"value"` | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
package nomad | ||
|
||
import ( | ||
"fmt" | ||
"io/ioutil" | ||
"net/http" | ||
"net/http/httptest" | ||
"testing" | ||
"time" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/testutil" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestNomadStats(t *testing.T) { | ||
var applyTests = []struct { | ||
name string | ||
expected []telegraf.Metric | ||
}{ | ||
{ | ||
name: "Metrics", | ||
expected: []telegraf.Metric{ | ||
testutil.MustMetric( | ||
"nomad.nomad.rpc.query", | ||
map[string]string{ | ||
"host": "node1", | ||
}, | ||
map[string]interface{}{ | ||
"count": int(7), | ||
"max": float64(1), | ||
"min": float64(1), | ||
"mean": float64(1), | ||
"rate": float64(0.7), | ||
"sum": float64(7), | ||
"sumsq": float64(0), | ||
}, | ||
time.Unix(1636843140, 0), | ||
1, | ||
), | ||
testutil.MustMetric( | ||
"nomad.client.allocated.cpu", | ||
map[string]string{ | ||
"node_scheduling_eligibility": "eligible", | ||
"host": "node1", | ||
"node_id": "2bbff078-8473-a9de-6c5e-42b4e053e12f", | ||
"datacenter": "dc1", | ||
"node_class": "none", | ||
"node_status": "ready", | ||
}, | ||
map[string]interface{}{ | ||
"value": float32(500), | ||
}, | ||
time.Unix(1636843140, 0), | ||
2, | ||
), | ||
testutil.MustMetric( | ||
"nomad.memberlist.gossip", | ||
map[string]string{ | ||
"host": "node1", | ||
}, | ||
map[string]interface{}{ | ||
"count": int(20), | ||
"max": float64(0.03747599944472313), | ||
"mean": float64(0.013159099989570678), | ||
"min": float64(0.003459000028669834), | ||
"rate": float64(0.026318199979141355), | ||
"stddev": float64(0.009523742715522742), | ||
"sum": float64(0.26318199979141355), | ||
"sumsq": float64(0), | ||
}, | ||
time.Unix(1636843140, 0), | ||
1, | ||
), | ||
}, | ||
}, | ||
} | ||
|
||
for _, tt := range applyTests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
if r.RequestURI == "/v1/metrics" { | ||
w.WriteHeader(http.StatusOK) | ||
responseKeyMetrics, _ := ioutil.ReadFile("testdata/response_key_metrics.json") | ||
_, err := fmt.Fprintln(w, string(responseKeyMetrics)) | ||
require.NoError(t, err) | ||
} | ||
})) | ||
defer ts.Close() | ||
|
||
plugin := &Nomad{ | ||
URL: ts.URL, | ||
} | ||
err := plugin.Init() | ||
require.NoError(t, err) | ||
|
||
acc := testutil.Accumulator{} | ||
err = plugin.Gather(&acc) | ||
require.NoError(t, err) | ||
|
||
testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics()) | ||
}) | ||
} | ||
} |
Oops, something went wrong.