Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds ability to parse datadog-formatted tags in the statsd input #927

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## v0.11.2 [unreleased]

### Features
- [#927](https://github.com/influxdata/telegraf/pull/927): Adds parsing of tags to the statsd input when using DataDog's dogstatsd extension
- [#863](https://github.com/influxdata/telegraf/pull/863): AMQP output: allow external auth. Thanks @ekini!
- [#707](https://github.com/influxdata/telegraf/pull/707): Improved prometheus plugin. Thanks @titilambert!
- [#878](https://github.com/influxdata/telegraf/pull/878): Added json serializer. Thanks @ch3lo!
Expand Down
5 changes: 5 additions & 0 deletions plugins/inputs/statsd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
## convert measurement names, "." to "_" and "-" to "__"
convert_names = true

## Parses tags in DataDog's dogstatsd format
## http://docs.datadoghq.com/guides/dogstatsd/
parse_data_dog_tags = false

## Statsd data translation templates, more info can be read here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite
# templates = [
Expand Down Expand Up @@ -155,6 +159,7 @@ per-measurement in the calculation of percentiles. Raising this limit increases
the accuracy of percentiles but also increases the memory usage and cpu time.
- **templates** []string: Templates for transforming statsd buckets into influx
measurements and tags.
- **parse_data_dog_tags** boolean: Enable parsing of tags in DataDog's dogstatsd format (http://docs.datadoghq.com/guides/dogstatsd/)

### Statsd bucket -> InfluxDB line-protocol Templates

Expand Down
51 changes: 51 additions & 0 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ type Statsd struct {
DeleteTimings bool
ConvertNames bool

// This flag enables parsing of tags in the dogstatsd extention to the
// statsd protocol (http://docs.datadoghq.com/guides/dogstatsd/)
ParseDataDogTags bool

// UDPPacketSize is the size of the read packets for the server listening
// for statsd UDP packets. This will default to 1500 bytes.
UDPPacketSize int `toml:"udp_packet_size"`
Expand Down Expand Up @@ -148,6 +152,10 @@ const sampleConfig = `
## convert measurement names, "." to "_" and "-" to "__"
convert_names = true

## Parses tags in the datadog statsd format
## http://docs.datadoghq.com/guides/dogstatsd/
parse_data_dog_tags = false

## Statsd data translation templates, more info can be read here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite
# templates = [
Expand Down Expand Up @@ -318,6 +326,43 @@ func (s *Statsd) parseStatsdLine(line string) error {
s.Lock()
defer s.Unlock()

lineTags := make(map[string]string)
if s.ParseDataDogTags {
recombinedSegments := make([]string, 0)
// datadog tags look like this:
// users.online:1|c|@0.5|#country:china,environment:production
// users.online:1|c|#sometagwithnovalue
// we will split on the pipe and remove any elements that are datadog
// tags, parse them, and rebuild the line sans the datadog tags
pipesplit := strings.Split(line, "|")
for _, segment := range pipesplit {
if len(segment) > 0 && segment[0] == '#' {
// we have ourselves a tag; they are comma separated
tagstr := segment[1:]
tags := strings.Split(tagstr, ",")
for _, tag := range tags {
ts := strings.Split(tag, ":")
var k, v string
switch len(ts) {
case 1:
// just a tag
k = ts[0]
v = ""
case 2:
k = ts[0]
v = ts[1]
}
if k != "" {
lineTags[k] = v
}
}
} else {
recombinedSegments = append(recombinedSegments, segment)
}
}
line = strings.Join(recombinedSegments, "|")
}

// Validate splitting the line on ":"
bits := strings.Split(line, ":")
if len(bits) < 2 {
Expand Down Expand Up @@ -415,6 +460,12 @@ func (s *Statsd) parseStatsdLine(line string) error {
m.tags["metric_type"] = "histogram"
}

if len(lineTags) > 0 {
for k, v := range lineTags {
m.tags[k] = v
}
}

// Make a unique key for the measurement name/tags
var tg []string
for k, v := range m.tags {
Expand Down
78 changes: 78 additions & 0 deletions plugins/inputs/statsd/statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,84 @@ func TestParse_Tags(t *testing.T) {
}
}

// Test that DataDog tags are parsed
func TestParse_DataDogTags(t *testing.T) {
s := NewStatsd()
s.ParseDataDogTags = true

lines := []string{
"my_counter:1|c|#host:localhost,environment:prod",
"my_gauge:10.1|g|#live",
"my_set:1|s|#host:localhost",
"my_timer:3|ms|@0.1|#live,host:localhost",
}

testTags := map[string]map[string]string{
"my_counter": map[string]string{
"host": "localhost",
"environment": "prod",
},

"my_gauge": map[string]string{
"live": "",
},

"my_set": map[string]string{
"host": "localhost",
},

"my_timer": map[string]string{
"live": "",
"host": "localhost",
},
}

for _, line := range lines {
err := s.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
}

sourceTags := map[string]map[string]string{
"my_gauge": tagsForItem(s.gauges),
"my_counter": tagsForItem(s.counters),
"my_set": tagsForItem(s.sets),
"my_timer": tagsForItem(s.timings),
}

for statName, tags := range testTags {
for k, v := range tags {
otherValue := sourceTags[statName][k]
if sourceTags[statName][k] != v {
t.Errorf("Error with %s, tag %s: %s != %s", statName, k, v, otherValue)
}
}
}
}

func tagsForItem(m interface{}) map[string]string {
switch m.(type) {
case map[string]cachedcounter:
for _, v := range m.(map[string]cachedcounter) {
return v.tags
}
case map[string]cachedgauge:
for _, v := range m.(map[string]cachedgauge) {
return v.tags
}
case map[string]cachedset:
for _, v := range m.(map[string]cachedset) {
return v.tags
}
case map[string]cachedtimings:
for _, v := range m.(map[string]cachedtimings) {
return v.tags
}
}
return nil
}

// Test that statsd buckets are parsed to measurement names properly
func TestParseName(t *testing.T) {
s := NewStatsd()
Expand Down