Skip to content

Commit

Permalink
[statsdreceiver] support simple tags (open-telemetry#29012)
Browse files Browse the repository at this point in the history
dogstatsd supports two types of tags on metrics: simple and dimensional
tags[^1]. the former is just a key, the latter is a key and a value.

with the assumption that many users of the statsdreceiver are enabling
ingest of dogstatsd metrics, this makes the statsd parser more
optimistic, so it can handle tags w/o a value. this functionality is
gated behind a new `enable_simple_tags` flag.

when this happens, we set an attribute that has a zero value. so far as
i know, this is allowed in the opentelemetry spec. the decision of how
to handle attributes w/ zero values is best left to configuration w/in
the pipeline itself, as different users may have different opinions or
approaches that work best with their systems.

[^1]:
https://www.datadoghq.com/blog/the-power-of-tagged-metrics/#whats-a-metric-tag

**Testing:**

added coverage to unit tests to enable parsing simple tags.

---------

Co-authored-by: Alex Boten <aboten@lightstep.com>
  • Loading branch information
2 people authored and jayasai470 committed Dec 8, 2023
1 parent e6cb791 commit b829af2
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 22 deletions.
28 changes: 28 additions & 0 deletions .chloggen/statsd-support-simple-tags.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "enhancement"

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: "statsdreceiver"

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for 'simple' tags that do not have a defined value, to accommodate DogStatsD metrics that may utilize these.


# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29012]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: "This functionality is gated behind a new `enable_simple_tags` config boolean, as it is not part of the StatsD spec."

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: ['user']
2 changes: 2 additions & 0 deletions receiver/statsdreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ The Following settings are optional:

- `enable_metric_type: true`(default value is false): Enable the statsd receiver to be able to emit the metric type(gauge, counter, timer(in the future), histogram(in the future)) as a label.

- `enable_simple_tags: true`(default value is false): Enable parsing tags that do not have a value, e.g. `#mykey` instead of `#mykey:myvalue`. DogStatsD supports such tagging.

- `is_monotonic_counter` (default value is false): Set all counter-type metrics the statsd receiver received as monotonic.

- `timer_histogram_mapping:`(default value is below): Specify what OTLP type to convert received timing/histogram data to.
Expand Down
1 change: 1 addition & 0 deletions receiver/statsdreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Config struct {
NetAddr confignet.NetAddr `mapstructure:",squash"`
AggregationInterval time.Duration `mapstructure:"aggregation_interval"`
EnableMetricType bool `mapstructure:"enable_metric_type"`
EnableSimpleTags bool `mapstructure:"enable_simple_tags"`
IsMonotonicCounter bool `mapstructure:"is_monotonic_counter"`
TimerHistogramMapping []protocol.TimerHistogramMapping `mapstructure:"timer_histogram_mapping"`
}
Expand Down
2 changes: 1 addition & 1 deletion receiver/statsdreceiver/internal/protocol/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

// Parser is something that can map input StatsD strings to OTLP Metric representations.
type Parser interface {
Initialize(enableMetricType bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error
Initialize(enableMetricType bool, enableSimpleTags bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error
GetMetrics() []BatchMetrics
Aggregate(line string, addr net.Addr) error
}
Expand Down
27 changes: 20 additions & 7 deletions receiver/statsdreceiver/internal/protocol/statsd_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ var defaultObserverCategory = ObserverCategory{
type StatsDParser struct {
instrumentsByAddress map[netAddr]*instruments
enableMetricType bool
enableSimpleTags bool
isMonotonicCounter bool
timerEvents ObserverCategory
histogramEvents ObserverCategory
Expand Down Expand Up @@ -156,12 +157,13 @@ func (p *StatsDParser) resetState(when time.Time) {
p.instrumentsByAddress = make(map[netAddr]*instruments)
}

func (p *StatsDParser) Initialize(enableMetricType bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error {
func (p *StatsDParser) Initialize(enableMetricType bool, enableSimpleTags bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error {
p.resetState(timeNowFunc())

p.histogramEvents = defaultObserverCategory
p.timerEvents = defaultObserverCategory
p.enableMetricType = enableMetricType
p.enableSimpleTags = enableSimpleTags
p.isMonotonicCounter = isMonotonicCounter
// Note: validation occurs in ("../".Config).validate()
for _, eachMap := range sendTimerHistogram {
Expand Down Expand Up @@ -270,7 +272,7 @@ func (p *StatsDParser) observerCategoryFor(t MetricType) ObserverCategory {

// Aggregate for each metric line.
func (p *StatsDParser) Aggregate(line string, addr net.Addr) error {
parsedMetric, err := parseMessageToMetric(line, p.enableMetricType)
parsedMetric, err := parseMessageToMetric(line, p.enableMetricType, p.enableSimpleTags)
if err != nil {
return err
}
Expand Down Expand Up @@ -349,7 +351,7 @@ func (p *StatsDParser) Aggregate(line string, addr net.Addr) error {
return nil
}

func parseMessageToMetric(line string, enableMetricType bool) (statsDMetric, error) {
func parseMessageToMetric(line string, enableMetricType bool, enableSimpleTags bool) (statsDMetric, error) {
result := statsDMetric{}

parts := strings.Split(line, "|")
Expand Down Expand Up @@ -410,11 +412,22 @@ func parseMessageToMetric(line string, enableMetricType bool) (statsDMetric, err

for _, tagSet := range tagSets {
tagParts := strings.SplitN(tagSet, ":", 2)
if len(tagParts) != 2 {
return result, fmt.Errorf("invalid tag format: %s", tagParts)
}
k := tagParts[0]
v := tagParts[1]
if k == "" {
return result, fmt.Errorf("invalid tag format: %q", tagSet)
}

// support both simple tags (w/o value) and dimension tags (w/ value).
// dogstatsd notably allows simple tags.
var v string
if len(tagParts) == 2 {
v = tagParts[1]
}

if v == "" && !enableSimpleTags {
return result, fmt.Errorf("invalid tag format: %q", tagSet)
}

kvs = append(kvs, attribute.String(k, v))
}
default:
Expand Down
87 changes: 73 additions & 14 deletions receiver/statsdreceiver/internal/protocol/statsd_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ func Test_ParseMessageToMetric(t *testing.T) {
},
{
name: "invalid tag format",
input: "test.metric:42|c|#key1",
err: errors.New("invalid tag format: [key1]"),
input: "test.metric:42|c|#:val1",
err: errors.New("invalid tag format: \":val1\""),
},
{
name: "unrecognized message part",
Expand Down Expand Up @@ -235,7 +235,7 @@ func Test_ParseMessageToMetric(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseMessageToMetric(tt.input, false)
got, err := parseMessageToMetric(tt.input, false, false)

if tt.err != nil {
assert.Equal(t, tt.err, err)
Expand Down Expand Up @@ -433,7 +433,66 @@ func Test_ParseMessageToMetricWithMetricType(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseMessageToMetric(tt.input, true)
got, err := parseMessageToMetric(tt.input, true, false)

if tt.err != nil {
assert.Equal(t, tt.err, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tt.wantMetric, got)
}
})
}
}

func Test_ParseMessageToMetricWithSimpleTags(t *testing.T) {
tests := []struct {
name string
input string
wantMetric statsDMetric
err error
}{
{
name: "counter metric with sample rate and (dimensional) tag",
input: "test.metric:42|c|@0.1|#key:value",
wantMetric: testStatsDMetric(
"test.metric",
42,
false,
"c",
0.1,
[]string{"key"},
[]string{"value"}),
},
{
name: "counter metric with sample rate and (simple) tag",
input: "test.metric:42|c|@0.1|#key",
wantMetric: testStatsDMetric(
"test.metric",
42,
false,
"c",
0.1,
[]string{"key"},
[]string{""}),
},
{
name: "counter metric with sample rate and two (simple) tags",
input: "test.metric:42|c|@0.1|#key,key2",
wantMetric: testStatsDMetric(
"test.metric",
42,
false,
"c",
0.1,
[]string{"key", "key2"},
[]string{"", ""}),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseMessageToMetric(tt.input, false, true)

if tt.err != nil {
assert.Equal(t, tt.err, err)
Expand Down Expand Up @@ -677,7 +736,7 @@ func TestStatsDParser_Aggregate(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
var err error
p := &StatsDParser{}
assert.NoError(t, p.Initialize(false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
assert.NoError(t, p.Initialize(false, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
p.lastIntervalTime = time.Unix(611, 0)
addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678")
addrKey := newNetAddr(addr)
Expand Down Expand Up @@ -746,7 +805,7 @@ func TestStatsDParser_AggregateByAddress(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &StatsDParser{}
assert.NoError(t, p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
p.lastIntervalTime = time.Unix(611, 0)
for i, addr := range tt.addresses {
for _, line := range tt.input[i] {
Expand Down Expand Up @@ -814,7 +873,7 @@ func TestStatsDParser_AggregateWithMetricType(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
var err error
p := &StatsDParser{}
assert.NoError(t, p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
p.lastIntervalTime = time.Unix(611, 0)
addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678")
addrKey := newNetAddr(addr)
Expand Down Expand Up @@ -864,7 +923,7 @@ func TestStatsDParser_AggregateWithIsMonotonicCounter(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
var err error
p := &StatsDParser{}
assert.NoError(t, p.Initialize(false, true, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
assert.NoError(t, p.Initialize(false, false, true, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
p.lastIntervalTime = time.Unix(611, 0)
addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678")
addrKey := newNetAddr(addr)
Expand Down Expand Up @@ -986,7 +1045,7 @@ func TestStatsDParser_AggregateTimerWithSummary(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
var err error
p := &StatsDParser{}
assert.NoError(t, p.Initialize(false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "summary"}, {StatsdType: "histogram", ObserverType: "summary"}}))
assert.NoError(t, p.Initialize(false, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "summary"}, {StatsdType: "histogram", ObserverType: "summary"}}))
addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678")
addrKey := newNetAddr(addr)
for _, line := range tt.input {
Expand All @@ -1003,7 +1062,7 @@ func TestStatsDParser_AggregateTimerWithSummary(t *testing.T) {

func TestStatsDParser_Initialize(t *testing.T) {
p := &StatsDParser{}
assert.NoError(t, p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
teststatsdDMetricdescription := statsDMetricDescription{
name: "test",
metricType: "g",
Expand All @@ -1022,7 +1081,7 @@ func TestStatsDParser_Initialize(t *testing.T) {

func TestStatsDParser_GetMetricsWithMetricType(t *testing.T) {
p := &StatsDParser{}
assert.NoError(t, p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
instrument := newInstruments(nil)
instrument.gauges[testDescription("statsdTestMetric1", "g",
[]string{"mykey", "metric_type"}, []string{"myvalue", "gauge"})] = buildGaugeMetric(testStatsDMetric("testGauge1", 1, false, "g", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "gauge"}), time.Unix(711, 0))
Expand Down Expand Up @@ -1095,7 +1154,7 @@ func TestStatsDParser_Mappings(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
p := &StatsDParser{}

assert.NoError(t, p.Initialize(false, false, tc.mapping))
assert.NoError(t, p.Initialize(false, false, false, tc.mapping))

addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678")
assert.NoError(t, p.Aggregate("H:10|h", addr))
Expand Down Expand Up @@ -1129,7 +1188,7 @@ func TestStatsDParser_ScopeIsIncluded(t *testing.T) {
}
testAddress, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678")

err := p.Initialize(true, false,
err := p.Initialize(true, false, false,
[]TimerHistogramMapping{
{StatsdType: "timer", ObserverType: "summary"},
{StatsdType: "histogram", ObserverType: "histogram"},
Expand Down Expand Up @@ -1399,7 +1458,7 @@ func TestStatsDParser_AggregateTimerWithHistogram(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
var err error
p := &StatsDParser{}
assert.NoError(t, p.Initialize(false, false, tt.mapping))
assert.NoError(t, p.Initialize(false, false, false, tt.mapping))
addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678")
for _, line := range tt.input {
err = p.Aggregate(line, addr)
Expand Down
1 change: 1 addition & 0 deletions receiver/statsdreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (r *statsdReceiver) Start(ctx context.Context, host component.Host) error {
ticker := time.NewTicker(r.config.AggregationInterval)
err = r.parser.Initialize(
r.config.EnableMetricType,
r.config.EnableSimpleTags,
r.config.IsMonotonicCounter,
r.config.TimerHistogramMapping,
)
Expand Down

0 comments on commit b829af2

Please sign in to comment.