Skip to content

Commit

Permalink
Add prometheus metric_version = 2 and url tag configurable (#5767)
Browse files Browse the repository at this point in the history
  • Loading branch information
vishiy authored and danielnelson committed Nov 21, 2019
1 parent c12a403 commit 12ecdab
Show file tree
Hide file tree
Showing 5 changed files with 272 additions and 4 deletions.
18 changes: 18 additions & 0 deletions plugins/inputs/prometheus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ in Prometheus format.
## An array of urls to scrape metrics from.
urls = ["http://localhost:9100/metrics"]

## Metric version (optional, default=1, supported values are 1 and 2)
# metric_version = 2

## An array of Kubernetes services to scrape metrics from.
# kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]

Expand Down Expand Up @@ -140,3 +143,18 @@ cpu_usage_user,cpu=cpu1,url=http://example.org:9273/metrics gauge=5.829145728641
cpu_usage_user,cpu=cpu2,url=http://example.org:9273/metrics gauge=2.119071644805144 1505776751000000000
cpu_usage_user,cpu=cpu3,url=http://example.org:9273/metrics gauge=1.5228426395944945 1505776751000000000
```

**Output (when metric_version = 2)**
```
prometheus,quantile=1,url=http://example.org:9273/metrics go_gc_duration_seconds=0.005574303 1556075100000000000
prometheus,quantile=0.75,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0001046 1556075100000000000
prometheus,quantile=0.5,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0000719 1556075100000000000
prometheus,quantile=0.25,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0000579 1556075100000000000
prometheus,quantile=0,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0000349 1556075100000000000
prometheus,url=http://example.org:9273/metrics go_gc_duration_seconds_count=324,go_gc_duration_seconds_sum=0.091340353 1556075100000000000
prometheus,url=http://example.org:9273/metrics go_goroutines=15 1556075100000000000
prometheus,cpu=cpu0,url=http://example.org:9273/metrics cpu_usage_user=1.513622603430151 1505776751000000000
prometheus,cpu=cpu1,url=http://example.org:9273/metrics cpu_usage_user=5.829145728641773 1505776751000000000
prometheus,cpu=cpu2,url=http://example.org:9273/metrics cpu_usage_user=2.119071644805144 1505776751000000000
prometheus,cpu=cpu3,url=http://example.org:9273/metrics cpu_usage_user=1.5228426395944945 1505776751000000000
```
158 changes: 158 additions & 0 deletions plugins/inputs/prometheus/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,145 @@ import (
"github.com/prometheus/common/expfmt"
)

// Parse returns a slice of Metrics from a text representation of a
// metrics
func ParseV2(buf []byte, header http.Header) ([]telegraf.Metric, error) {
var metrics []telegraf.Metric
var parser expfmt.TextParser
// parse even if the buffer begins with a newline
buf = bytes.TrimPrefix(buf, []byte("\n"))
// Read raw data
buffer := bytes.NewBuffer(buf)
reader := bufio.NewReader(buffer)

mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type"))
// Prepare output
metricFamilies := make(map[string]*dto.MetricFamily)

if err == nil && mediatype == "application/vnd.google.protobuf" &&
params["encoding"] == "delimited" &&
params["proto"] == "io.prometheus.client.MetricFamily" {
for {
mf := &dto.MetricFamily{}
if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil {
if ierr == io.EOF {
break
}
return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", ierr)
}
metricFamilies[mf.GetName()] = mf
}
} else {
metricFamilies, err = parser.TextToMetricFamilies(reader)
if err != nil {
return nil, fmt.Errorf("reading text format failed: %s", err)
}
}

// read metrics
for metricName, mf := range metricFamilies {
for _, m := range mf.Metric {
// reading tags
tags := makeLabels(m)

if mf.GetType() == dto.MetricType_SUMMARY {
// summary metric
telegrafMetrics := makeQuantilesV2(m, tags, metricName, mf.GetType())
metrics = append(metrics, telegrafMetrics...)
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
// histogram metric
telegrafMetrics := makeBucketsV2(m, tags, metricName, mf.GetType())
metrics = append(metrics, telegrafMetrics...)
} else {
// standard metric
// reading fields
fields := make(map[string]interface{})
fields = getNameAndValueV2(m, metricName)
// converting to telegraf metric
if len(fields) > 0 {
var t time.Time
if m.TimestampMs != nil && *m.TimestampMs > 0 {
t = time.Unix(0, *m.TimestampMs*1000000)
} else {
t = time.Now()
}
metric, err := metric.New("prometheus", tags, fields, t, valueType(mf.GetType()))
if err == nil {
metrics = append(metrics, metric)
}
}
}
}
}

return metrics, err
}

// Get Quantiles for summary metric & Buckets for histogram
func makeQuantilesV2(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType) []telegraf.Metric {
var metrics []telegraf.Metric
fields := make(map[string]interface{})
var t time.Time
if m.TimestampMs != nil && *m.TimestampMs > 0 {
t = time.Unix(0, *m.TimestampMs*1000000)
} else {
t = time.Now()
}
fields[metricName+"_count"] = float64(m.GetSummary().GetSampleCount())
fields[metricName+"_sum"] = float64(m.GetSummary().GetSampleSum())
met, err := metric.New("prometheus", tags, fields, t, valueType(metricType))
if err == nil {
metrics = append(metrics, met)
}

for _, q := range m.GetSummary().Quantile {
newTags := tags
fields = make(map[string]interface{})
if !math.IsNaN(q.GetValue()) {
newTags["quantile"] = fmt.Sprint(q.GetQuantile())
fields[metricName] = float64(q.GetValue())

quantileMetric, err := metric.New("prometheus", newTags, fields, t, valueType(metricType))
if err == nil {
metrics = append(metrics, quantileMetric)
}
}
}
return metrics
}

// Get Buckets from histogram metric
func makeBucketsV2(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType) []telegraf.Metric {
var metrics []telegraf.Metric
fields := make(map[string]interface{})
var t time.Time
if m.TimestampMs != nil && *m.TimestampMs > 0 {
t = time.Unix(0, *m.TimestampMs*1000000)
} else {
t = time.Now()
}
fields[metricName+"_count"] = float64(m.GetHistogram().GetSampleCount())
fields[metricName+"_sum"] = float64(m.GetHistogram().GetSampleSum())

met, err := metric.New("prometheus", tags, fields, t, valueType(metricType))
if err == nil {
metrics = append(metrics, met)
}

for _, b := range m.GetHistogram().Bucket {
newTags := tags
fields = make(map[string]interface{})
newTags["le"] = fmt.Sprint(b.GetUpperBound())
fields[metricName+"_bucket"] = float64(b.GetCumulativeCount())

histogramMetric, err := metric.New("prometheus", newTags, fields, t, valueType(metricType))
if err == nil {
metrics = append(metrics, histogramMetric)
}
}
return metrics
}

// Parse returns a slice of Metrics from a text representation of a
// metrics
func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) {
Expand Down Expand Up @@ -159,3 +298,22 @@ func getNameAndValue(m *dto.Metric) map[string]interface{} {
}
return fields
}

// Get name and value from metric
func getNameAndValueV2(m *dto.Metric, metricName string) map[string]interface{} {
fields := make(map[string]interface{})
if m.Gauge != nil {
if !math.IsNaN(m.GetGauge().GetValue()) {
fields[metricName] = float64(m.GetGauge().GetValue())
}
} else if m.Counter != nil {
if !math.IsNaN(m.GetCounter().GetValue()) {
fields[metricName] = float64(m.GetCounter().GetValue())
}
} else if m.Untyped != nil {
if !math.IsNaN(m.GetUntyped().GetValue()) {
fields[metricName] = float64(m.GetUntyped().GetValue())
}
}
return fields
}
21 changes: 19 additions & 2 deletions plugins/inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type Prometheus struct {

ResponseTimeout internal.Duration `toml:"response_timeout"`

MetricVersion int `toml:"metric_version"`

URLTag string `toml:"url_tag"`

tls.ClientConfig

Log telegraf.Logger
Expand All @@ -58,6 +62,12 @@ var sampleConfig = `
## An array of urls to scrape metrics from.
urls = ["http://localhost:9100/metrics"]
## Metric version (optional, default=1, supported values are 1 and 2)
# metric_version = 2
## Url tag name (tag containing scrapped url. optional, default is "url")
# url_tag = "scrapeUrl"
## An array of Kubernetes services to scrape metrics from.
# kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]
Expand Down Expand Up @@ -224,6 +234,7 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
var req *http.Request
var err error
var uClient *http.Client
var metrics []telegraf.Metric
if u.URL.Scheme == "unix" {
path := u.URL.Query().Get("path")
if path == "" {
Expand Down Expand Up @@ -285,7 +296,12 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
return fmt.Errorf("error reading body: %s", err)
}

metrics, err := Parse(body, resp.Header)
if p.MetricVersion == 2 {
metrics, err = ParseV2(body, resp.Header)
} else {
metrics, err = Parse(body, resp.Header)
}

if err != nil {
return fmt.Errorf("error reading metrics for %s: %s",
u.URL, err)
Expand All @@ -295,7 +311,7 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
tags := metric.Tags()
// strip user and password from URL
u.OriginalURL.User = nil
tags["url"] = u.OriginalURL.String()
tags[p.URLTag] = u.OriginalURL.String()
if u.Address != "" {
tags["address"] = u.Address
}
Expand Down Expand Up @@ -342,6 +358,7 @@ func init() {
return &Prometheus{
ResponseTimeout: internal.Duration{Duration: time.Second * 3},
kubernetesPods: map[string]URLAndAddress{},
URLTag: "url",
}
})
}
67 changes: 65 additions & 2 deletions plugins/inputs/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,21 @@ go_goroutines 15
# TYPE test_metric untyped
test_metric{label="value"} 1.0 1490802350000
`
const sampleSummaryTextFormat = `# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0.00010425500000000001
go_gc_duration_seconds{quantile="0.25"} 0.000139108
go_gc_duration_seconds{quantile="0.5"} 0.00015749400000000002
go_gc_duration_seconds{quantile="0.75"} 0.000331463
go_gc_duration_seconds{quantile="1"} 0.000667154
go_gc_duration_seconds_sum 0.0018183950000000002
go_gc_duration_seconds_count 7
`
const sampleGaugeTextFormat = `
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 15 1490802350000
`

func TestPrometheusGeneratesMetrics(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -37,8 +52,9 @@ func TestPrometheusGeneratesMetrics(t *testing.T) {
defer ts.Close()

p := &Prometheus{
Log: testutil.Logger{},
URLs: []string{ts.URL},
Log: testutil.Logger{},
URLs: []string{ts.URL},
URLTag: "url",
}

var acc testutil.Accumulator
Expand All @@ -63,6 +79,7 @@ func TestPrometheusGeneratesMetricsWithHostNameTag(t *testing.T) {
p := &Prometheus{
Log: testutil.Logger{},
KubernetesServices: []string{ts.URL},
URLTag: "url",
}
u, _ := url.Parse(ts.URL)
tsAddress := u.Hostname()
Expand Down Expand Up @@ -106,3 +123,49 @@ func TestPrometheusGeneratesMetricsAlthoughFirstDNSFails(t *testing.T) {
assert.True(t, acc.HasFloatField("test_metric", "value"))
assert.True(t, acc.HasTimestamp("test_metric", time.Unix(1490802350, 0)))
}

func TestPrometheusGeneratesSummaryMetricsV2(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, sampleSummaryTextFormat)
}))
defer ts.Close()

p := &Prometheus{
URLs: []string{ts.URL},
URLTag: "url",
MetricVersion: 2,
}

var acc testutil.Accumulator

err := acc.GatherError(p.Gather)
require.NoError(t, err)

assert.True(t, acc.TagSetValue("prometheus", "quantile") == "0")
assert.True(t, acc.HasFloatField("prometheus", "go_gc_duration_seconds_sum"))
assert.True(t, acc.HasFloatField("prometheus", "go_gc_duration_seconds_count"))
assert.True(t, acc.TagValue("prometheus", "url") == ts.URL+"/metrics")

}

func TestPrometheusGeneratesGaugeMetricsV2(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, sampleGaugeTextFormat)
}))
defer ts.Close()

p := &Prometheus{
URLs: []string{ts.URL},
URLTag: "url",
MetricVersion: 2,
}

var acc testutil.Accumulator

err := acc.GatherError(p.Gather)
require.NoError(t, err)

assert.True(t, acc.HasFloatField("prometheus", "go_goroutines"))
assert.True(t, acc.TagValue("prometheus", "url") == ts.URL+"/metrics")
assert.True(t, acc.HasTimestamp("prometheus", time.Unix(1490802350, 0)))
}
12 changes: 12 additions & 0 deletions testutil/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,18 @@ func (a *Accumulator) HasTag(measurement string, key string) bool {
return false
}

func (a *Accumulator) TagSetValue(measurement string, key string) string {
for _, p := range a.Metrics {
if p.Measurement == measurement {
v, ok := p.Tags[key]
if ok {
return v
}
}
}
return ""
}

func (a *Accumulator) TagValue(measurement string, key string) string {
for _, p := range a.Metrics {
if p.Measurement == measurement {
Expand Down

0 comments on commit 12ecdab

Please sign in to comment.