Skip to content

Commit

Permalink
Prometheus parser fix, parse headers properly
Browse files Browse the repository at this point in the history
closes #1458
  • Loading branch information
sparrc committed Jul 9, 2016
1 parent c046232 commit c0d020a
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 72 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## v1.0
## v1.0 [unreleased]

### Release Notes

Expand Down Expand Up @@ -42,6 +42,7 @@ should now look like:
- [#1405](https://github.com/influxdata/telegraf/issues/1405): Fix memory/connection leak in prometheus input plugin.
- [#1378](https://github.com/influxdata/telegraf/issues/1378): Trim BOM from config file for Windows support.
- [#1339](https://github.com/influxdata/telegraf/issues/1339): Prometheus client output panic on service reload.
- [#1461](https://github.com/influxdata/telegraf/pull/1461): Prometheus parser, protobuf format header fix.

## v1.0 beta 2 [2016-06-21]

Expand Down
46 changes: 5 additions & 41 deletions plugins/inputs/prometheus/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"math"
"mime"
"net/http"
"time"

"github.com/influxdata/telegraf"
Expand All @@ -19,17 +20,9 @@ import (
"github.com/prometheus/common/expfmt"
)

// PrometheusParser is an object for Parsing incoming metrics.
type PrometheusParser struct {
// PromFormat
PromFormat map[string]string
// DefaultTags will be added to every parsed metric
// DefaultTags map[string]string
}

// Parse returns a slice of Metrics from a text representation of a
// metrics
func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) {
func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) {
var metrics []telegraf.Metric
var parser expfmt.TextParser
// parse even if the buffer begins with a newline
Expand All @@ -38,17 +31,16 @@ func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) {
buffer := bytes.NewBuffer(buf)
reader := bufio.NewReader(buffer)

// Get format
mediatype, params, err := mime.ParseMediaType(p.PromFormat["Content-Type"])
mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type"))
// Prepare output
metricFamilies := make(map[string]*dto.MetricFamily)
if err == nil && mediatype == "application/vnd.google.protobuf" &&
params["encoding"] == "delimited" &&
params["proto"] == "io.prometheus.client.MetricFamily" {
for {
metricFamily := &dto.MetricFamily{}
if _, err = pbutil.ReadDelimited(reader, metricFamily); err != nil {
if err == io.EOF {
if _, readerr := pbutil.ReadDelimited(reader, metricFamily); readerr != nil {
if readerr == io.EOF {
break
}
return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", err)
Expand All @@ -65,11 +57,6 @@ func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) {
for _, m := range mf.Metric {
// reading tags
tags := makeLabels(m)
/*
for key, value := range p.DefaultTags {
tags[key] = value
}
*/
// reading fields
fields := make(map[string]interface{})
if mf.GetType() == dto.MetricType_SUMMARY {
Expand Down Expand Up @@ -106,29 +93,6 @@ func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) {
return metrics, err
}

// Parse one line
func (p *PrometheusParser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line + "\n"))

if err != nil {
return nil, err
}

if len(metrics) < 1 {
return nil, fmt.Errorf(
"Can not parse the line: %s, for data format: prometheus", line)
}

return metrics[0], nil
}

/*
// Set default tags
func (p *PrometheusParser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
*/

// Get Quantiles from summary metric
func makeQuantiles(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{})
Expand Down
22 changes: 6 additions & 16 deletions plugins/inputs/prometheus/parser_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package prometheus

import (
"net/http"
"testing"
"time"

Expand Down Expand Up @@ -101,10 +102,8 @@ cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
`

func TestParseValidPrometheus(t *testing.T) {
parser := PrometheusParser{}

// Gauge value
metrics, err := parser.Parse([]byte(validUniqueGauge))
metrics, err := Parse([]byte(validUniqueGauge), http.Header{})
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "cadvisor_version_info", metrics[0].Name())
Expand All @@ -118,8 +117,7 @@ func TestParseValidPrometheus(t *testing.T) {
}, metrics[0].Tags())

// Counter value
//parser.SetDefaultTags(map[string]string{"mytag": "mytagvalue"})
metrics, err = parser.Parse([]byte(validUniqueCounter))
metrics, err = Parse([]byte(validUniqueCounter), http.Header{})
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "get_token_fail_count", metrics[0].Name())
Expand All @@ -129,8 +127,8 @@ func TestParseValidPrometheus(t *testing.T) {
assert.Equal(t, map[string]string{}, metrics[0].Tags())

// Summary data
//parser.SetDefaultTags(map[string]string{})
metrics, err = parser.Parse([]byte(validUniqueSummary))
//SetDefaultTags(map[string]string{})
metrics, err = Parse([]byte(validUniqueSummary), http.Header{})
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "http_request_duration_microseconds", metrics[0].Name())
Expand All @@ -144,7 +142,7 @@ func TestParseValidPrometheus(t *testing.T) {
assert.Equal(t, map[string]string{"handler": "prometheus"}, metrics[0].Tags())

// histogram data
metrics, err = parser.Parse([]byte(validUniqueHistogram))
metrics, err = Parse([]byte(validUniqueHistogram), http.Header{})
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "apiserver_request_latencies", metrics[0].Name())
Expand All @@ -165,11 +163,3 @@ func TestParseValidPrometheus(t *testing.T) {
metrics[0].Tags())

}

func TestParseLineInvalidPrometheus(t *testing.T) {
parser := PrometheusParser{}
metric, err := parser.ParseLine(validUniqueLine)
assert.NotNil(t, err)
assert.Nil(t, metric)

}
16 changes: 2 additions & 14 deletions plugins/inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ var client = &http.Client{
func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
collectDate := time.Now()
var req, err = http.NewRequest("GET", url, nil)
req.Header = make(http.Header)
var token []byte
var resp *http.Response

Expand Down Expand Up @@ -129,20 +128,9 @@ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
return fmt.Errorf("error reading body: %s", err)
}

// Headers
headers := make(map[string]string)
for key, value := range headers {
headers[key] = value
}

// Prepare Prometheus parser config
promparser := PrometheusParser{
PromFormat: headers,
}

metrics, err := promparser.Parse(body)
metrics, err := Parse(body, resp.Header)
if err != nil {
return fmt.Errorf("error getting processing samples for %s: %s",
return fmt.Errorf("error reading metrics for %s: %s",
url, err)
}
// Add (or not) collected metrics
Expand Down

0 comments on commit c0d020a

Please sign in to comment.