From b829af21c57a9702079273eed6c38538f18b13db Mon Sep 17 00:00:00 2001 From: Jason Anderson <796413+diurnalist@users.noreply.github.com> Date: Wed, 6 Dec 2023 15:20:40 -0600 Subject: [PATCH] [statsdreceiver] support simple tags (#29012) dogstatsd supports two types of tags on metrics: simple and dimensional tags[^1]. the former is just a key, the latter is a key and a value. with the assumption that many users of the statsdreceiver are enabling ingest of dogstatsd metrics, this makes the statsd parser more optimistic, so it can handle tags w/o a value. this functionality is gated behind a new `enable_simple_tags` flag. when this happens, we set an attribute that has a zero value. so far as i know, this is allowed in the opentelemetry spec. the decision of how to handle attributes w/ zero values is best left to configuration w/in the pipeline itself, as different users may have different opinions or approaches that work best with their systems. [^1]: https://www.datadoghq.com/blog/the-power-of-tagged-metrics/#whats-a-metric-tag **Testing:** added coverage to unit tests to enable parsing simple tags. --------- Co-authored-by: Alex Boten --- .chloggen/statsd-support-simple-tags.yaml | 28 ++++++ receiver/statsdreceiver/README.md | 2 + receiver/statsdreceiver/config.go | 1 + .../internal/protocol/parser.go | 2 +- .../internal/protocol/statsd_parser.go | 27 ++++-- .../internal/protocol/statsd_parser_test.go | 87 ++++++++++++++++--- receiver/statsdreceiver/receiver.go | 1 + 7 files changed, 126 insertions(+), 22 deletions(-) create mode 100755 .chloggen/statsd-support-simple-tags.yaml diff --git a/.chloggen/statsd-support-simple-tags.yaml b/.chloggen/statsd-support-simple-tags.yaml new file mode 100755 index 000000000000..c753eccf6493 --- /dev/null +++ b/.chloggen/statsd-support-simple-tags.yaml @@ -0,0 +1,28 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "enhancement" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: "statsdreceiver" + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for 'simple' tags that do not have a defined value, to accommodate DogStatsD metrics that may utilize these. + + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29012] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: "This functionality is gated behind a new `enable_simple_tags` config boolean, as it is not part of the StatsD spec." + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: ['user'] diff --git a/receiver/statsdreceiver/README.md b/receiver/statsdreceiver/README.md index db55b5aa773c..50124c748450 100644 --- a/receiver/statsdreceiver/README.md +++ b/receiver/statsdreceiver/README.md @@ -33,6 +33,8 @@ The Following settings are optional: - `enable_metric_type: true`(default value is false): Enable the statsd receiver to be able to emit the metric type(gauge, counter, timer(in the future), histogram(in the future)) as a label. +- `enable_simple_tags: true`(default value is false): Enable parsing tags that do not have a value, e.g. `#mykey` instead of `#mykey:myvalue`. DogStatsD supports such tagging. + - `is_monotonic_counter` (default value is false): Set all counter-type metrics the statsd receiver received as monotonic. - `timer_histogram_mapping:`(default value is below): Specify what OTLP type to convert received timing/histogram data to. diff --git a/receiver/statsdreceiver/config.go b/receiver/statsdreceiver/config.go index 76dd700b6374..4e9a2d5eb75d 100644 --- a/receiver/statsdreceiver/config.go +++ b/receiver/statsdreceiver/config.go @@ -19,6 +19,7 @@ type Config struct { NetAddr confignet.NetAddr `mapstructure:",squash"` AggregationInterval time.Duration `mapstructure:"aggregation_interval"` EnableMetricType bool `mapstructure:"enable_metric_type"` + EnableSimpleTags bool `mapstructure:"enable_simple_tags"` IsMonotonicCounter bool `mapstructure:"is_monotonic_counter"` TimerHistogramMapping []protocol.TimerHistogramMapping `mapstructure:"timer_histogram_mapping"` } diff --git a/receiver/statsdreceiver/internal/protocol/parser.go b/receiver/statsdreceiver/internal/protocol/parser.go index abfea560ce24..bc72f7e3f160 100644 --- a/receiver/statsdreceiver/internal/protocol/parser.go +++ b/receiver/statsdreceiver/internal/protocol/parser.go @@ -12,7 +12,7 @@ import ( // Parser is something that can map input StatsD strings to OTLP Metric representations. type Parser interface { - Initialize(enableMetricType bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error + Initialize(enableMetricType bool, enableSimpleTags bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error GetMetrics() []BatchMetrics Aggregate(line string, addr net.Addr) error } diff --git a/receiver/statsdreceiver/internal/protocol/statsd_parser.go b/receiver/statsdreceiver/internal/protocol/statsd_parser.go index ab624dd93df4..474fe03945ba 100644 --- a/receiver/statsdreceiver/internal/protocol/statsd_parser.go +++ b/receiver/statsdreceiver/internal/protocol/statsd_parser.go @@ -79,6 +79,7 @@ var defaultObserverCategory = ObserverCategory{ type StatsDParser struct { instrumentsByAddress map[netAddr]*instruments enableMetricType bool + enableSimpleTags bool isMonotonicCounter bool timerEvents ObserverCategory histogramEvents ObserverCategory @@ -156,12 +157,13 @@ func (p *StatsDParser) resetState(when time.Time) { p.instrumentsByAddress = make(map[netAddr]*instruments) } -func (p *StatsDParser) Initialize(enableMetricType bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error { +func (p *StatsDParser) Initialize(enableMetricType bool, enableSimpleTags bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error { p.resetState(timeNowFunc()) p.histogramEvents = defaultObserverCategory p.timerEvents = defaultObserverCategory p.enableMetricType = enableMetricType + p.enableSimpleTags = enableSimpleTags p.isMonotonicCounter = isMonotonicCounter // Note: validation occurs in ("../".Config).validate() for _, eachMap := range sendTimerHistogram { @@ -270,7 +272,7 @@ func (p *StatsDParser) observerCategoryFor(t MetricType) ObserverCategory { // Aggregate for each metric line. func (p *StatsDParser) Aggregate(line string, addr net.Addr) error { - parsedMetric, err := parseMessageToMetric(line, p.enableMetricType) + parsedMetric, err := parseMessageToMetric(line, p.enableMetricType, p.enableSimpleTags) if err != nil { return err } @@ -349,7 +351,7 @@ func (p *StatsDParser) Aggregate(line string, addr net.Addr) error { return nil } -func parseMessageToMetric(line string, enableMetricType bool) (statsDMetric, error) { +func parseMessageToMetric(line string, enableMetricType bool, enableSimpleTags bool) (statsDMetric, error) { result := statsDMetric{} parts := strings.Split(line, "|") @@ -410,11 +412,22 @@ func parseMessageToMetric(line string, enableMetricType bool) (statsDMetric, err for _, tagSet := range tagSets { tagParts := strings.SplitN(tagSet, ":", 2) - if len(tagParts) != 2 { - return result, fmt.Errorf("invalid tag format: %s", tagParts) - } k := tagParts[0] - v := tagParts[1] + if k == "" { + return result, fmt.Errorf("invalid tag format: %q", tagSet) + } + + // support both simple tags (w/o value) and dimension tags (w/ value). + // dogstatsd notably allows simple tags. + var v string + if len(tagParts) == 2 { + v = tagParts[1] + } + + if v == "" && !enableSimpleTags { + return result, fmt.Errorf("invalid tag format: %q", tagSet) + } + kvs = append(kvs, attribute.String(k, v)) } default: diff --git a/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go b/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go index 2228a29567c4..d91ee7c3ed27 100644 --- a/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go +++ b/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go @@ -53,8 +53,8 @@ func Test_ParseMessageToMetric(t *testing.T) { }, { name: "invalid tag format", - input: "test.metric:42|c|#key1", - err: errors.New("invalid tag format: [key1]"), + input: "test.metric:42|c|#:val1", + err: errors.New("invalid tag format: \":val1\""), }, { name: "unrecognized message part", @@ -235,7 +235,7 @@ func Test_ParseMessageToMetric(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := parseMessageToMetric(tt.input, false) + got, err := parseMessageToMetric(tt.input, false, false) if tt.err != nil { assert.Equal(t, tt.err, err) @@ -433,7 +433,66 @@ func Test_ParseMessageToMetricWithMetricType(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := parseMessageToMetric(tt.input, true) + got, err := parseMessageToMetric(tt.input, true, false) + + if tt.err != nil { + assert.Equal(t, tt.err, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.wantMetric, got) + } + }) + } +} + +func Test_ParseMessageToMetricWithSimpleTags(t *testing.T) { + tests := []struct { + name string + input string + wantMetric statsDMetric + err error + }{ + { + name: "counter metric with sample rate and (dimensional) tag", + input: "test.metric:42|c|@0.1|#key:value", + wantMetric: testStatsDMetric( + "test.metric", + 42, + false, + "c", + 0.1, + []string{"key"}, + []string{"value"}), + }, + { + name: "counter metric with sample rate and (simple) tag", + input: "test.metric:42|c|@0.1|#key", + wantMetric: testStatsDMetric( + "test.metric", + 42, + false, + "c", + 0.1, + []string{"key"}, + []string{""}), + }, + { + name: "counter metric with sample rate and two (simple) tags", + input: "test.metric:42|c|@0.1|#key,key2", + wantMetric: testStatsDMetric( + "test.metric", + 42, + false, + "c", + 0.1, + []string{"key", "key2"}, + []string{"", ""}), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseMessageToMetric(tt.input, false, true) if tt.err != nil { assert.Equal(t, tt.err, err) @@ -677,7 +736,7 @@ func TestStatsDParser_Aggregate(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var err error p := &StatsDParser{} - assert.NoError(t, p.Initialize(false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(false, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) p.lastIntervalTime = time.Unix(611, 0) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") addrKey := newNetAddr(addr) @@ -746,7 +805,7 @@ func TestStatsDParser_AggregateByAddress(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { p := &StatsDParser{} - assert.NoError(t, p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) p.lastIntervalTime = time.Unix(611, 0) for i, addr := range tt.addresses { for _, line := range tt.input[i] { @@ -814,7 +873,7 @@ func TestStatsDParser_AggregateWithMetricType(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var err error p := &StatsDParser{} - assert.NoError(t, p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) p.lastIntervalTime = time.Unix(611, 0) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") addrKey := newNetAddr(addr) @@ -864,7 +923,7 @@ func TestStatsDParser_AggregateWithIsMonotonicCounter(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var err error p := &StatsDParser{} - assert.NoError(t, p.Initialize(false, true, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(false, false, true, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) p.lastIntervalTime = time.Unix(611, 0) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") addrKey := newNetAddr(addr) @@ -986,7 +1045,7 @@ func TestStatsDParser_AggregateTimerWithSummary(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var err error p := &StatsDParser{} - assert.NoError(t, p.Initialize(false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "summary"}, {StatsdType: "histogram", ObserverType: "summary"}})) + assert.NoError(t, p.Initialize(false, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "summary"}, {StatsdType: "histogram", ObserverType: "summary"}})) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") addrKey := newNetAddr(addr) for _, line := range tt.input { @@ -1003,7 +1062,7 @@ func TestStatsDParser_AggregateTimerWithSummary(t *testing.T) { func TestStatsDParser_Initialize(t *testing.T) { p := &StatsDParser{} - assert.NoError(t, p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) teststatsdDMetricdescription := statsDMetricDescription{ name: "test", metricType: "g", @@ -1022,7 +1081,7 @@ func TestStatsDParser_Initialize(t *testing.T) { func TestStatsDParser_GetMetricsWithMetricType(t *testing.T) { p := &StatsDParser{} - assert.NoError(t, p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) instrument := newInstruments(nil) instrument.gauges[testDescription("statsdTestMetric1", "g", []string{"mykey", "metric_type"}, []string{"myvalue", "gauge"})] = buildGaugeMetric(testStatsDMetric("testGauge1", 1, false, "g", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "gauge"}), time.Unix(711, 0)) @@ -1095,7 +1154,7 @@ func TestStatsDParser_Mappings(t *testing.T) { t.Run(tc.name, func(t *testing.T) { p := &StatsDParser{} - assert.NoError(t, p.Initialize(false, false, tc.mapping)) + assert.NoError(t, p.Initialize(false, false, false, tc.mapping)) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") assert.NoError(t, p.Aggregate("H:10|h", addr)) @@ -1129,7 +1188,7 @@ func TestStatsDParser_ScopeIsIncluded(t *testing.T) { } testAddress, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") - err := p.Initialize(true, false, + err := p.Initialize(true, false, false, []TimerHistogramMapping{ {StatsdType: "timer", ObserverType: "summary"}, {StatsdType: "histogram", ObserverType: "histogram"}, @@ -1399,7 +1458,7 @@ func TestStatsDParser_AggregateTimerWithHistogram(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var err error p := &StatsDParser{} - assert.NoError(t, p.Initialize(false, false, tt.mapping)) + assert.NoError(t, p.Initialize(false, false, false, tt.mapping)) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") for _, line := range tt.input { err = p.Aggregate(line, addr) diff --git a/receiver/statsdreceiver/receiver.go b/receiver/statsdreceiver/receiver.go index 74d4354e4a62..3034fc561bbc 100644 --- a/receiver/statsdreceiver/receiver.go +++ b/receiver/statsdreceiver/receiver.go @@ -91,6 +91,7 @@ func (r *statsdReceiver) Start(ctx context.Context, host component.Host) error { ticker := time.NewTicker(r.config.AggregationInterval) err = r.parser.Initialize( r.config.EnableMetricType, + r.config.EnableSimpleTags, r.config.IsMonotonicCounter, r.config.TimerHistogramMapping, )