Skip to content

Commit

Permalink
Opt-in support for measurement and tag templates in Dropwizard parser
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Jan 5, 2018
1 parent 46412c2 commit e221c75
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 9 deletions.
20 changes: 18 additions & 2 deletions docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ You can also change the path to the typesdb or add additional typesdb using

# Dropwizard:

The dropwizard format can parse the JSON representation of a single dropwizard metric registry. Tags are parsed from metric names as if they were actual influxdb line protocol keys (`measurement<,tag_set>`) and all field values are collected as float64 fields.
The dropwizard format can parse the JSON representation of a single dropwizard metric registry. By default, tags are parsed from metric names as if they were actual influxdb line protocol keys (`measurement<,tag_set>`) which can be overriden by defining custom [measurement & tag templates](./DATA_FORMATS_INPUT.md#measurement--tag-templates). All field values are collected as float64 fields.

A typical JSON of a dropwizard metric registry:

Expand Down Expand Up @@ -559,7 +559,7 @@ measurement,metric_type=histogram count=1,max=1.0,mean=1.0,min=1.0,p50=1.0,p75=1
measurement,metric_type=timer count=1,max=1.0,mean=1.0,min=1.0,p50=1.0,p75=1.0,p95=1.0,p98=1.0,p99=1.0,p999=1.0,stddev=1.0,m15_rate=1.0,m1_rate=1.0,m5_rate=1.0,mean_rate=1.0
```

You may also parse a dropwizard registry from any JSON document which contains a dropwizard registry in some field.
You may also parse a dropwizard registry from any JSON document which contains a dropwizard registry in some inner field.
Eg. to parse the following JSON document:

```json
Expand Down Expand Up @@ -619,6 +619,21 @@ For more information about the dropwizard json format see
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "dropwizard"

## Used by the templating engine to join matched values when cardinality is > 1
separator = "_"

## Each template line requires a template pattern. It can have an optional
## filter before the template and separated by spaces. It can also have optional extra
## tags following the template. Multiple tags should be separated by commas and no spaces
## similar to the line protocol format. There can be only one default template.
## Templates support below format:
## 1. filter + template
## 2. filter + template + extra tag(s)
## 3. filter + template with field key
## 4. default template
## By providing an empty template array, templating is disabled and measurements are parsed as influxdb line protocol keys (measurement<,tag_set>)
templates = []

## You may use an appropriate [gjson path](https://github.com/tidwall/gjson#path-syntax)
## to locate the metric registry within the JSON document
# dropwizard_metric_registry_path = "metrics"
Expand All @@ -636,4 +651,5 @@ For more information about the dropwizard json format see
# [inputs.exec.dropwizard_tag_paths]
# tag1 = "tags.tag1"
# tag2 = "tags.tag2"

```
2 changes: 1 addition & 1 deletion internal/templating/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Engine struct {
}

// Apply extracts the template fields from the given line and returns the measurement
// name and tags
// name, tags and field name
func (e *Engine) Apply(line string) (string, map[string]string, string, error) {
return e.matcher.match(line).Apply(line, e.joiner)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/templating/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Template struct {
}

// apply extracts the template fields from the given line and returns the measurement
// name and tags
// name, tags and field name
func (t *Template) Apply(line string, joiner string) (string, map[string]string, string, error) {
fields := strings.Split(line, t.separator)
var (
Expand Down
Binary file added plugins/parsers/dropwizard/debug.test
Binary file not shown.
47 changes: 44 additions & 3 deletions plugins/parsers/dropwizard/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/templating"
"github.com/influxdata/telegraf/metric"
"github.com/tidwall/gjson"
)
Expand Down Expand Up @@ -41,9 +42,11 @@ type Parser struct {
// an optional map of default tags to use for metrics
DefaultTags map[string]string

// templating similar to graphite
// templating configuration
Separator string
Templates []string

templateEngine *templating.Engine
}

// Parse parses the input bytes to an array of metrics
Expand Down Expand Up @@ -91,6 +94,17 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
return metrics, nil
}

// InitTemplating initializes the templating support
func (p *Parser) InitTemplating() error {
if len(p.Templates) > 0 {
defaultTemplate, _ := templating.NewDefaultTemplateWithPattern("measurement*")
templateEngine, err := templating.NewEngine(p.Separator, defaultTemplate, p.Templates)
p.templateEngine = templateEngine
return err
}
return nil
}

// ParseLine is not supported by the dropwizard format
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
return nil, fmt.Errorf("ParseLine not supported: %s, for data format: dropwizard", line)
Expand Down Expand Up @@ -170,19 +184,38 @@ func (p *Parser) readDWMetrics(metricType string, dwms interface{}, metrics []te
case map[string]interface{}:
var metricsBuffer bytes.Buffer
for dwmName, dwmFields := range dwmsTyped {
measurementName := dwmName
tags := make(map[string]string)
fieldPrefix := ""
if p.templateEngine != nil {
measurementName, tags, fieldPrefix, _ = p.templateEngine.Apply(dwmName)
if len(fieldPrefix) > 0 {
fieldPrefix = fmt.Sprintf("%s%s", fieldPrefix, p.Separator)
}
}
tags["metric_type"] = metricType

measurementWithTags := measurementName
for tagName, tagValue := range tags {
tagKeyValue := fmt.Sprintf("%s=%s", tagName, tagValue)
measurementWithTags = fmt.Sprintf("%s,%s", measurementWithTags, tagKeyValue)
}

fields := make([]string, 0)
switch t := dwmFields.(type) {
case map[string]interface{}: // json object
for fieldName, fieldValue := range t {

switch v := fieldValue.(type) {
case float64:
fields = append(fields, fmt.Sprintf("%s=%f", fieldName, v))
fields = append(fields, fmt.Sprintf("%s%s=%f", fieldPrefix, fieldName, v))
default: // ignore
}
}
default: // ignore
}
metricsBuffer.WriteString(fmt.Sprintf("%s,metric_type=%s ", dwmName, metricType))

metricsBuffer.WriteString(fmt.Sprintf("%s,metric_type=%s ", measurementWithTags, metricType))
metricsBuffer.WriteString(strings.Join(fields, ","))
metricsBuffer.WriteString("\n")
}
Expand All @@ -196,3 +229,11 @@ func (p *Parser) readDWMetrics(metricType string, dwms interface{}, metrics []te
}

}

func arraymap(vs []string, f func(string) string) []string {
vsm := make([]string, len(vs))
for i, v := range vs {
vsm[i] = f(v)
}
return vsm
}
109 changes: 109 additions & 0 deletions plugins/parsers/dropwizard/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package dropwizard
import (
"testing"

"github.com/influxdata/telegraf"

"fmt"
"time"

Expand Down Expand Up @@ -374,3 +376,110 @@ func TestTagParsingProblems(t *testing.T) {
assert.Len(t, metrics2, 1)
assert.Equal(t, map[string]string{"metric_type": "counter", "tag1": "green"}, metrics2[0].Tags())
}

// sampleTemplateJSON is a sample json document containing metrics to be tested against the templating engine.
const sampleTemplateJSON = `
{
"version": "3.0.0",
"counters" : {},
"meters" : {},
"gauges" : {
"vm.memory.heap.committed" : { "value" : 1 },
"vm.memory.heap.init" : { "value" : 2 },
"vm.memory.heap.max" : { "value" : 3 },
"vm.memory.heap.usage" : { "value" : 4 },
"vm.memory.heap.used" : { "value" : 5 },
"vm.memory.non-heap.committed" : { "value" : 6 },
"vm.memory.non-heap.init" : { "value" : 7 },
"vm.memory.non-heap.max" : { "value" : 8 },
"vm.memory.non-heap.usage" : { "value" : 9 },
"vm.memory.non-heap.used" : { "value" : 10 }
},
"histograms" : {
"jenkins.job.building.duration" : {
"count" : 1,
"max" : 2,
"mean" : 3,
"min" : 4,
"p50" : 5,
"p75" : 6,
"p95" : 7,
"p98" : 8,
"p99" : 9,
"p999" : 10,
"stddev" : 11
}
},
"timers" : {}
}
`

func TestParseSampleTemplateJSON(t *testing.T) {
parser := Parser{
Separator: "_",
Templates: []string{
"jenkins.* measurement.metric.metric.field",
"vm.* measurement.measurement.pool.field",
},
}
parser.InitTemplating()

metrics, err := parser.Parse([]byte(sampleTemplateJSON))
assert.NoError(t, err)

assert.Len(t, metrics, 11)

jenkinsMetric := search(metrics, "jenkins", nil, "")
assert.NotNil(t, jenkinsMetric, "the metrics should contain a jenkins measurement")
assert.Equal(t, map[string]interface{}{
"duration_count": float64(1),
"duration_max": float64(2),
"duration_mean": float64(3),
"duration_min": float64(4),
"duration_p50": float64(5),
"duration_p75": float64(6),
"duration_p95": float64(7),
"duration_p98": float64(8),
"duration_p99": float64(9),
"duration_p999": float64(10),
"duration_stddev": float64(11),
}, jenkinsMetric.Fields())
assert.Equal(t, map[string]string{"metric_type": "histogram", "metric": "job_building"}, jenkinsMetric.Tags())

vmMemoryHeapCommitted := search(metrics, "vm_memory", map[string]string{"pool": "heap"}, "committed_value")
assert.NotNil(t, vmMemoryHeapCommitted)
assert.Equal(t, map[string]interface{}{
"committed_value": float64(1),
}, vmMemoryHeapCommitted.Fields())
assert.Equal(t, map[string]string{"metric_type": "gauge", "pool": "heap"}, vmMemoryHeapCommitted.Tags())

vmMemoryNonHeapCommitted := search(metrics, "vm_memory", map[string]string{"pool": "non-heap"}, "committed_value")
assert.NotNil(t, vmMemoryNonHeapCommitted)
assert.Equal(t, map[string]interface{}{
"committed_value": float64(6),
}, vmMemoryNonHeapCommitted.Fields())
assert.Equal(t, map[string]string{"metric_type": "gauge", "pool": "non-heap"}, vmMemoryNonHeapCommitted.Tags())
}

func search(metrics []telegraf.Metric, name string, tags map[string]string, fieldName string) telegraf.Metric {
for _, v := range metrics {
if v.Name() == name && containsAll(v.Tags(), tags) {
if len(fieldName) == 0 {
return v
}
if _, ok := v.Fields()[fieldName]; ok {
return v
}
}
}
return nil
}

func containsAll(t1 map[string]string, t2 map[string]string) bool {
for k, v := range t2 {
if foundValue, ok := t1[k]; !ok || v != foundValue {
return false
}
}
return true
}
12 changes: 10 additions & 2 deletions plugins/parsers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func NewParser(config *Config) (Parser, error) {
config.CollectdSecurityLevel, config.CollectdTypesDB)
case "dropwizard":
parser, err = NewDropwizardParser(config.DropwizardMetricRegistryPath,
config.DropwizardTimePath, config.DropwizardTimeFormat, config.DropwizardTagsPath, config.DropwizardTagPathsMap, config.DefaultTags)
config.DropwizardTimePath, config.DropwizardTimeFormat, config.DropwizardTagsPath, config.DropwizardTagPathsMap, config.DefaultTags,
config.Separator, config.Templates)
default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
}
Expand Down Expand Up @@ -171,6 +172,9 @@ func NewDropwizardParser(
tagsPath string,
tagPathsMap map[string]string,
defaultTags map[string]string,
separator string,
templates []string,

) (Parser, error) {
parser := &dropwizard.Parser{
MetricRegistryPath: metricRegistryPath,
Expand All @@ -179,6 +183,10 @@ func NewDropwizardParser(
TagsPath: tagsPath,
TagPathsMap: tagPathsMap,
DefaultTags: defaultTags,
Separator: separator,
Templates: templates,
}
return parser, nil
err := parser.InitTemplating()

return parser, err
}

0 comments on commit e221c75

Please sign in to comment.