From 5ad5841aa613cb844be3381a08009097788f3a33 Mon Sep 17 00:00:00 2001 From: Shaun Remekie Date: Wed, 25 Sep 2024 02:13:26 +0200 Subject: [PATCH] [receiver/statsd] Add Option to Aggregate on IP/Host (#34851) **Description:** The `statsdreceiver` only aggregates metrics on `protocol+host+ip`, this leads to issues or inconsistencies when dealing with clients that constantly switch tcp/udp ports. To address the issue, this PR adds a configuration option `enableIPOnlyAggregation` that allows the use to specify if they want to aggregate on the `IP` instead of `IP+Port`. For example: _otel_config.yaml:_ ```yaml receivers: statsd: endpoint: "0.0.0.0:8125" enable_metric_type: true is_monotonic_counter: false aggregation_interval: 10s enable_ip_only_aggregation: true # <-- enable ip only aggregation timer_histogram_mapping: - statsd_type: "timing" observer_type: "histogram" histogram: max_size: 50 exporters: debug: verbosity: detailed service: pipelines: metrics: receivers: - statsd exporters: - debug ``` _run:_ ```sh STATSD_HOST="localhost" STATSD_PORT=8125 for port in {10000..10010}; do echo -n "my.metric:1|c" | nc -w 1 -u -p $port ${STATSD_HOST} ${STATSD_PORT} echo "Sent from port $port" done ``` _result:_ ``` 2024-08-26T23:36:00.224+0200 info ResourceMetrics #0 Resource SchemaURL: ScopeMetrics #0 ScopeMetrics SchemaURL: InstrumentationScope otelcol/statsdreceiver 0.103.0-dev Metric #0 Descriptor: -> Name: -n my.metric -> Description: -> Unit: -> DataType: Sum -> IsMonotonic: false -> AggregationTemporality: Delta NumberDataPoints #0 Data point attributes: -> metric_type: Str(counter) StartTimestamp: 2024-08-26 21:35:50.223101 +0000 UTC Timestamp: 2024-08-26 21:36:00.224252 +0000 UTC Value: 7 {"kind": "exporter", "data_type": "metrics", "name": "debug"} 2024-08-26T23:36:10.224+0200 info MetricsExporter {"kind": "exporter", "data_type": "metrics", "name": "debug", "resource metrics": 1, "metrics": 1, "data points": 1} 2024-08-26T23:36:10.224+0200 info ResourceMetrics #0 Resource SchemaURL: ScopeMetrics #0 ScopeMetrics SchemaURL: InstrumentationScope otelcol/statsdreceiver 0.103.0-dev Metric #0 Descriptor: -> Name: -n my.metric -> Description: -> Unit: -> DataType: Sum -> IsMonotonic: false -> AggregationTemporality: Delta NumberDataPoints #0 Data point attributes: -> metric_type: Str(counter) StartTimestamp: 2024-08-26 21:36:00.224252 +0000 UTC Timestamp: 2024-08-26 21:36:10.224607 +0000 UTC Value: 4 {"kind": "exporter", "data_type": "metrics", "name": "debug"} ``` Instead of generating 11 metrics for each port that was used to send, only 2 metrics are blocks are returned, who's values total 11. ![2024-08-26 23 44 15](https://github.com/user-attachments/assets/6b8a89d1-186e-4257-9c82-90c5f9d14f98) **Link to tracking Issue:** #23809 **Testing:** - [x] Added unit tests **Documentation:** - [x] Added information to the statsdreceiver `README.md` describing the option. --------- Co-authored-by: Povilas Versockas --- .chloggen/es-337.yaml | 28 +++++++++ receiver/statsdreceiver/README.md | 2 + receiver/statsdreceiver/config.go | 13 +++-- .../internal/protocol/parser.go | 2 +- .../internal/protocol/statsd_parser.go | 34 ++++++++--- .../internal/protocol/statsd_parser_test.go | 58 +++++++++++++++---- receiver/statsdreceiver/receiver.go | 1 + 7 files changed, 112 insertions(+), 26 deletions(-) create mode 100644 .chloggen/es-337.yaml diff --git a/.chloggen/es-337.yaml b/.chloggen/es-337.yaml new file mode 100644 index 000000000000..1c7d54bf45d1 --- /dev/null +++ b/.chloggen/es-337.yaml @@ -0,0 +1,28 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/statsd + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: | + Add support for aggregating on Host/IP. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [23809] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/statsdreceiver/README.md b/receiver/statsdreceiver/README.md index 988c21f81a51..d1601815a28e 100644 --- a/receiver/statsdreceiver/README.md +++ b/receiver/statsdreceiver/README.md @@ -30,6 +30,8 @@ The Following settings are optional: - `enable_metric_type: true`(default value is false): Enable the statsd receiver to be able to emit the metric type(gauge, counter, timer(in the future), histogram(in the future)) as a label. +- `enable_ip_only_aggregation` (default value is false): Enables metric aggregation on `Client+IP` only. Normally, aggregation is performed on `Client+IP+Port`. This setting is useful when the client sends metrics from a random ports or the receiver should aggregate metrics from the same client but different ports. + - `enable_simple_tags: true`(default value is false): Enable parsing tags that do not have a value, e.g. `#mykey` instead of `#mykey:myvalue`. DogStatsD supports such tagging. - `is_monotonic_counter` (default value is false): Set all counter-type metrics the statsd receiver received as monotonic. diff --git a/receiver/statsdreceiver/config.go b/receiver/statsdreceiver/config.go index 939f18a4b09c..2cb36e8837f7 100644 --- a/receiver/statsdreceiver/config.go +++ b/receiver/statsdreceiver/config.go @@ -16,12 +16,13 @@ import ( // Config defines configuration for StatsD receiver. type Config struct { - NetAddr confignet.AddrConfig `mapstructure:",squash"` - AggregationInterval time.Duration `mapstructure:"aggregation_interval"` - EnableMetricType bool `mapstructure:"enable_metric_type"` - EnableSimpleTags bool `mapstructure:"enable_simple_tags"` - IsMonotonicCounter bool `mapstructure:"is_monotonic_counter"` - TimerHistogramMapping []protocol.TimerHistogramMapping `mapstructure:"timer_histogram_mapping"` + NetAddr confignet.AddrConfig `mapstructure:",squash"` + AggregationInterval time.Duration `mapstructure:"aggregation_interval"` + EnableIPOnlyAggregation bool `mapstructure:"enable_ip_only_aggregation"` + EnableMetricType bool `mapstructure:"enable_metric_type"` + EnableSimpleTags bool `mapstructure:"enable_simple_tags"` + IsMonotonicCounter bool `mapstructure:"is_monotonic_counter"` + TimerHistogramMapping []protocol.TimerHistogramMapping `mapstructure:"timer_histogram_mapping"` } func (c *Config) Validate() error { diff --git a/receiver/statsdreceiver/internal/protocol/parser.go b/receiver/statsdreceiver/internal/protocol/parser.go index bc72f7e3f160..431847816ded 100644 --- a/receiver/statsdreceiver/internal/protocol/parser.go +++ b/receiver/statsdreceiver/internal/protocol/parser.go @@ -12,7 +12,7 @@ import ( // Parser is something that can map input StatsD strings to OTLP Metric representations. type Parser interface { - Initialize(enableMetricType bool, enableSimpleTags bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error + Initialize(enableMetricType bool, enableSimpleTags bool, isMonotonicCounter bool, enableIPOnlyAggregation bool, sendTimerHistogram []TimerHistogramMapping) error GetMetrics() []BatchMetrics Aggregate(line string, addr net.Addr) error } diff --git a/receiver/statsdreceiver/internal/protocol/statsd_parser.go b/receiver/statsdreceiver/internal/protocol/statsd_parser.go index c9e00e3c0d0b..8a2ad5237c77 100644 --- a/receiver/statsdreceiver/internal/protocol/statsd_parser.go +++ b/receiver/statsdreceiver/internal/protocol/statsd_parser.go @@ -84,14 +84,15 @@ var defaultObserverCategory = ObserverCategory{ // StatsDParser supports the Parse method for parsing StatsD messages with Tags. type StatsDParser struct { - instrumentsByAddress map[netAddr]*instruments - enableMetricType bool - enableSimpleTags bool - isMonotonicCounter bool - timerEvents ObserverCategory - histogramEvents ObserverCategory - lastIntervalTime time.Time - BuildInfo component.BuildInfo + instrumentsByAddress map[netAddr]*instruments + enableMetricType bool + enableSimpleTags bool + isMonotonicCounter bool + enableIPOnlyAggregation bool + timerEvents ObserverCategory + histogramEvents ObserverCategory + lastIntervalTime time.Time + BuildInfo component.BuildInfo } type instruments struct { @@ -166,7 +167,7 @@ func (p *StatsDParser) resetState(when time.Time) { p.instrumentsByAddress = make(map[netAddr]*instruments) } -func (p *StatsDParser) Initialize(enableMetricType bool, enableSimpleTags bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error { +func (p *StatsDParser) Initialize(enableMetricType bool, enableSimpleTags bool, isMonotonicCounter bool, enableIPOnlyAggregation bool, sendTimerHistogram []TimerHistogramMapping) error { p.resetState(timeNowFunc()) p.histogramEvents = defaultObserverCategory @@ -174,6 +175,8 @@ func (p *StatsDParser) Initialize(enableMetricType bool, enableSimpleTags bool, p.enableMetricType = enableMetricType p.enableSimpleTags = enableSimpleTags p.isMonotonicCounter = isMonotonicCounter + p.enableIPOnlyAggregation = enableIPOnlyAggregation + // Note: validation occurs in ("../".Config).validate() for _, eachMap := range sendTimerHistogram { switch eachMap.StatsdType { @@ -292,6 +295,10 @@ func (p *StatsDParser) Aggregate(line string, addr net.Addr) error { } addrKey := newNetAddr(addr) + if p.enableIPOnlyAggregation { + addrKey = newIPOnlyNetAddr(addr) + } + instrument, ok := p.instrumentsByAddress[addrKey] if !ok { instrument = newInstruments(addr) @@ -494,3 +501,12 @@ type netAddr struct { func newNetAddr(addr net.Addr) netAddr { return netAddr{addr.Network(), addr.String()} } + +func newIPOnlyNetAddr(addr net.Addr) netAddr { + host, _, err := net.SplitHostPort(addr.String()) + if err != nil { + // if there is an error, use the original address + return netAddr{addr.Network(), addr.String()} + } + return netAddr{addr.Network(), host} +} diff --git a/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go b/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go index bc7bd2423516..c908e6991aa4 100644 --- a/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go +++ b/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go @@ -1049,7 +1049,7 @@ func TestStatsDParser_Aggregate(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var err error p := &StatsDParser{} - assert.NoError(t, p.Initialize(false, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(false, false, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) p.lastIntervalTime = time.Unix(611, 0) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") addrKey := newNetAddr(addr) @@ -1158,7 +1158,7 @@ func TestStatsDParser_AggregateByAddress(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { p := &StatsDParser{} - assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(true, false, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) p.lastIntervalTime = time.Unix(611, 0) for i, addr := range tt.addresses { for _, line := range tt.input[i] { @@ -1266,7 +1266,7 @@ func TestStatsDParser_AggregateWithMetricType(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var err error p := &StatsDParser{} - assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(true, false, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) p.lastIntervalTime = time.Unix(611, 0) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") addrKey := newNetAddr(addr) @@ -1336,7 +1336,7 @@ func TestStatsDParser_AggregateWithIsMonotonicCounter(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var err error p := &StatsDParser{} - assert.NoError(t, p.Initialize(false, false, true, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(false, false, true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) p.lastIntervalTime = time.Unix(611, 0) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") addrKey := newNetAddr(addr) @@ -1463,7 +1463,7 @@ func TestStatsDParser_AggregateTimerWithSummary(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var err error p := &StatsDParser{} - assert.NoError(t, p.Initialize(false, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "summary"}, {StatsdType: "histogram", ObserverType: "summary", Summary: SummaryConfig{Percentiles: []float64{0, 95, 99}}}})) + assert.NoError(t, p.Initialize(false, false, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "summary"}, {StatsdType: "histogram", ObserverType: "summary", Summary: SummaryConfig{Percentiles: []float64{0, 95, 99}}}})) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") addrKey := newNetAddr(addr) for _, line := range tt.input { @@ -1480,7 +1480,7 @@ func TestStatsDParser_AggregateTimerWithSummary(t *testing.T) { func TestStatsDParser_Initialize(t *testing.T) { p := &StatsDParser{} - assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(true, false, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) teststatsdDMetricdescription := statsDMetricDescription{ name: "test", metricType: "g", @@ -1499,7 +1499,7 @@ func TestStatsDParser_Initialize(t *testing.T) { func TestStatsDParser_GetMetricsWithMetricType(t *testing.T) { p := &StatsDParser{} - assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(true, false, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) instrument := newInstruments(nil) instrument.gauges[testDescription("statsdTestMetric1", "g", []string{"mykey", "metric_type"}, []string{"myvalue", "gauge"})] = buildGaugeMetric( @@ -1612,7 +1612,7 @@ func TestStatsDParser_Mappings(t *testing.T) { t.Run(tc.name, func(t *testing.T) { p := &StatsDParser{} - assert.NoError(t, p.Initialize(false, false, false, tc.mapping)) + assert.NoError(t, p.Initialize(false, false, false, false, tc.mapping)) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") assert.NoError(t, p.Aggregate("H:10|h", addr)) @@ -1646,7 +1646,7 @@ func TestStatsDParser_ScopeIsIncluded(t *testing.T) { } testAddress, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") - err := p.Initialize(true, false, false, + err := p.Initialize(true, false, false, false, []TimerHistogramMapping{ {StatsdType: "timer", ObserverType: "summary"}, {StatsdType: "histogram", ObserverType: "histogram"}, @@ -1916,7 +1916,7 @@ func TestStatsDParser_AggregateTimerWithHistogram(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var err error p := &StatsDParser{} - assert.NoError(t, p.Initialize(false, false, false, tt.mapping)) + assert.NoError(t, p.Initialize(false, false, false, false, tt.mapping)) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") for _, line := range tt.input { err = p.Aggregate(line, addr) @@ -1927,3 +1927,41 @@ func TestStatsDParser_AggregateTimerWithHistogram(t *testing.T) { }) } } + +func TestStatsDParser_IPOnlyAggregation(t *testing.T) { + const devVersion = "dev-0.0.1" + p := &StatsDParser{ + BuildInfo: component.BuildInfo{ + Version: devVersion, + }, + } + testAddr01, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") + testAddr02, _ := net.ResolveUDPAddr("udp", "1.2.3.4:8765") + + err := p.Initialize(true, false, false, true, + []TimerHistogramMapping{ + {StatsdType: "timer", ObserverType: "summary"}, + {StatsdType: "histogram", ObserverType: "histogram"}, + }, + ) + + require.NoError(t, err) + require.NoError(t, p.Aggregate("test.metric:1|c", testAddr01)) + require.NoError(t, p.Aggregate("test.metric:3|c", testAddr02)) + require.Len(t, p.instrumentsByAddress, 1) + + for k := range p.instrumentsByAddress { + assert.Equal(t, "1.2.3.4", k.String) + assert.Equal(t, "udp", k.Network) + } + metrics := p.GetMetrics() + require.Len(t, metrics, 1) + + value := metrics[0].Metrics. + ResourceMetrics().At(0). + ScopeMetrics().At(0). + Metrics().At(0).Sum().DataPoints().At(0).IntValue() + + assert.Equal(t, int64(4), value) + +} diff --git a/receiver/statsdreceiver/receiver.go b/receiver/statsdreceiver/receiver.go index 529e3a9f83ea..d22c5a5be10b 100644 --- a/receiver/statsdreceiver/receiver.go +++ b/receiver/statsdreceiver/receiver.go @@ -107,6 +107,7 @@ func (r *statsdReceiver) Start(ctx context.Context, host component.Host) error { r.config.EnableMetricType, r.config.EnableSimpleTags, r.config.IsMonotonicCounter, + r.config.EnableIPOnlyAggregation, r.config.TimerHistogramMapping, ) if err != nil {