From e85efb3814b4dbbbf14a88ed66ff065ed89d84e0 Mon Sep 17 00:00:00 2001 From: odubajDT <93584209+odubajDT@users.noreply.github.com> Date: Tue, 2 Jul 2024 19:54:30 +0200 Subject: [PATCH] [chore]: move aggregation logic to internal module (#33669) **Description:** - duplicated and enhanced aggregation business logic (with median function) for common usage in follow-up tickets - tests **Link to tracking Issue:** #16224 **Follow-ups:** - https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/33655 - https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/33334 - https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/33423 --------- Signed-off-by: odubajDT --- .../coreinternal/aggregateutil/aggregate.go | 365 +++++++++++ .../aggregateutil/aggregate_test.go | 566 ++++++++++++++++++ internal/coreinternal/aggregateutil/type.go | 60 ++ .../coreinternal/aggregateutil/type_test.go | 66 ++ 4 files changed, 1057 insertions(+) create mode 100644 internal/coreinternal/aggregateutil/aggregate.go create mode 100644 internal/coreinternal/aggregateutil/aggregate_test.go create mode 100644 internal/coreinternal/aggregateutil/type.go create mode 100644 internal/coreinternal/aggregateutil/type_test.go diff --git a/internal/coreinternal/aggregateutil/aggregate.go b/internal/coreinternal/aggregateutil/aggregate.go new file mode 100644 index 000000000000..dcfe176be121 --- /dev/null +++ b/internal/coreinternal/aggregateutil/aggregate.go @@ -0,0 +1,365 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package aggregateutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil" + +import ( + "encoding/json" + "math" + "sort" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +func CopyMetricDetails(from, to pmetric.Metric) { + to.SetName(from.Name()) + to.SetUnit(from.Unit()) + to.SetDescription(from.Description()) + //exhaustive:enforce + switch from.Type() { + case pmetric.MetricTypeGauge: + to.SetEmptyGauge() + case pmetric.MetricTypeSum: + to.SetEmptySum().SetAggregationTemporality(from.Sum().AggregationTemporality()) + to.Sum().SetIsMonotonic(from.Sum().IsMonotonic()) + case pmetric.MetricTypeHistogram: + to.SetEmptyHistogram().SetAggregationTemporality(from.Histogram().AggregationTemporality()) + case pmetric.MetricTypeExponentialHistogram: + to.SetEmptyExponentialHistogram().SetAggregationTemporality(from.ExponentialHistogram().AggregationTemporality()) + case pmetric.MetricTypeSummary: + to.SetEmptySummary() + } +} + +func FilterAttrs(metric pmetric.Metric, filterAttrKeys []string) { + if len(filterAttrKeys) == 0 { + return + } + RangeDataPointAttributes(metric, func(attrs pcommon.Map) bool { + attrs.RemoveIf(func(k string, _ pcommon.Value) bool { + return isNotPresent(k, filterAttrKeys) + }) + return true + }) +} + +func GroupDataPoints(metric pmetric.Metric, ag *AggGroups) { + switch metric.Type() { + case pmetric.MetricTypeGauge: + if ag.gauge == nil { + ag.gauge = map[string]pmetric.NumberDataPointSlice{} + } + groupNumberDataPoints(metric.Gauge().DataPoints(), false, ag.gauge) + case pmetric.MetricTypeSum: + if ag.sum == nil { + ag.sum = map[string]pmetric.NumberDataPointSlice{} + } + groupByStartTime := metric.Sum().AggregationTemporality() == pmetric.AggregationTemporalityDelta + groupNumberDataPoints(metric.Sum().DataPoints(), groupByStartTime, ag.sum) + case pmetric.MetricTypeHistogram: + if ag.histogram == nil { + ag.histogram = map[string]pmetric.HistogramDataPointSlice{} + } + groupByStartTime := metric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityDelta + groupHistogramDataPoints(metric.Histogram().DataPoints(), groupByStartTime, ag.histogram) + case pmetric.MetricTypeExponentialHistogram: + if ag.expHistogram == nil { + ag.expHistogram = map[string]pmetric.ExponentialHistogramDataPointSlice{} + } + groupByStartTime := metric.ExponentialHistogram().AggregationTemporality() == pmetric.AggregationTemporalityDelta + groupExponentialHistogramDataPoints(metric.ExponentialHistogram().DataPoints(), groupByStartTime, ag.expHistogram) + } +} + +func MergeDataPoints(to pmetric.Metric, aggType AggregationType, ag AggGroups) { + switch to.Type() { + case pmetric.MetricTypeGauge: + mergeNumberDataPoints(ag.gauge, aggType, to.Gauge().DataPoints()) + case pmetric.MetricTypeSum: + mergeNumberDataPoints(ag.sum, aggType, to.Sum().DataPoints()) + case pmetric.MetricTypeHistogram: + mergeHistogramDataPoints(ag.histogram, to.Histogram().DataPoints()) + case pmetric.MetricTypeExponentialHistogram: + mergeExponentialHistogramDataPoints(ag.expHistogram, to.ExponentialHistogram().DataPoints()) + } +} + +// RangeDataPointAttributes calls f sequentially on attributes of every metric data point. +// The iteration terminates if f returns false. +func RangeDataPointAttributes(metric pmetric.Metric, f func(pcommon.Map) bool) { + //exhaustive:enforce + switch metric.Type() { + case pmetric.MetricTypeGauge: + for i := 0; i < metric.Gauge().DataPoints().Len(); i++ { + dp := metric.Gauge().DataPoints().At(i) + if !f(dp.Attributes()) { + return + } + } + case pmetric.MetricTypeSum: + for i := 0; i < metric.Sum().DataPoints().Len(); i++ { + dp := metric.Sum().DataPoints().At(i) + if !f(dp.Attributes()) { + return + } + } + case pmetric.MetricTypeHistogram: + for i := 0; i < metric.Histogram().DataPoints().Len(); i++ { + dp := metric.Histogram().DataPoints().At(i) + if !f(dp.Attributes()) { + return + } + } + case pmetric.MetricTypeExponentialHistogram: + for i := 0; i < metric.ExponentialHistogram().DataPoints().Len(); i++ { + dp := metric.ExponentialHistogram().DataPoints().At(i) + if !f(dp.Attributes()) { + return + } + } + case pmetric.MetricTypeSummary: + for i := 0; i < metric.Summary().DataPoints().Len(); i++ { + dp := metric.Summary().DataPoints().At(i) + if !f(dp.Attributes()) { + return + } + } + } +} + +func isNotPresent(target string, arr []string) bool { + for _, item := range arr { + if item == target { + return false + } + } + return true +} + +func mergeNumberDataPoints(dpsMap map[string]pmetric.NumberDataPointSlice, agg AggregationType, to pmetric.NumberDataPointSlice) { + for _, dps := range dpsMap { + dp := to.AppendEmpty() + dps.At(0).MoveTo(dp) + switch dp.ValueType() { + case pmetric.NumberDataPointValueTypeDouble: + medianNumbers := []float64{dp.DoubleValue()} + for i := 1; i < dps.Len(); i++ { + switch agg { + case Sum, Mean: + dp.SetDoubleValue(dp.DoubleValue() + doubleVal(dps.At(i))) + case Max: + dp.SetDoubleValue(math.Max(dp.DoubleValue(), doubleVal(dps.At(i)))) + case Min: + dp.SetDoubleValue(math.Min(dp.DoubleValue(), doubleVal(dps.At(i)))) + case Median: + medianNumbers = append(medianNumbers, doubleVal(dps.At(i))) + case Count: + dp.SetDoubleValue(float64(dps.Len())) + } + if dps.At(i).StartTimestamp() < dp.StartTimestamp() { + dp.SetStartTimestamp(dps.At(i).StartTimestamp()) + } + } + if agg == Mean { + dp.SetDoubleValue(dp.DoubleValue() / float64(dps.Len())) + } + if agg == Median { + if len(medianNumbers) == 1 { + dp.SetDoubleValue(medianNumbers[0]) + } else { + sort.Float64s(medianNumbers) + mNumber := len(medianNumbers) / 2 + if math.Mod(float64(len(medianNumbers)), 2) != 0 { + dp.SetDoubleValue(medianNumbers[mNumber]) + } else { + dp.SetDoubleValue((medianNumbers[mNumber-1] + medianNumbers[mNumber]) / 2) + } + } + + } + case pmetric.NumberDataPointValueTypeInt: + medianNumbers := []int64{dp.IntValue()} + for i := 1; i < dps.Len(); i++ { + switch agg { + case Sum, Mean: + dp.SetIntValue(dp.IntValue() + dps.At(i).IntValue()) + case Max: + if dp.IntValue() < intVal(dps.At(i)) { + dp.SetIntValue(intVal(dps.At(i))) + } + case Min: + if dp.IntValue() > intVal(dps.At(i)) { + dp.SetIntValue(intVal(dps.At(i))) + } + case Median: + medianNumbers = append(medianNumbers, intVal(dps.At(i))) + case Count: + dp.SetIntValue(int64(dps.Len())) + } + if dps.At(i).StartTimestamp() < dp.StartTimestamp() { + dp.SetStartTimestamp(dps.At(i).StartTimestamp()) + } + } + if agg == Median { + if len(medianNumbers) == 1 { + dp.SetIntValue(medianNumbers[0]) + } else { + sort.Slice(medianNumbers, func(i, j int) bool { + return medianNumbers[i] < medianNumbers[j] + }) + mNumber := len(medianNumbers) / 2 + if math.Mod(float64(len(medianNumbers)), 2) != 0 { + dp.SetIntValue(medianNumbers[mNumber]) + } else { + dp.SetIntValue((medianNumbers[mNumber-1] + medianNumbers[mNumber]) / 2) + } + } + } + if agg == Mean { + dp.SetIntValue(dp.IntValue() / int64(dps.Len())) + } + } + } +} + +func doubleVal(dp pmetric.NumberDataPoint) float64 { + switch dp.ValueType() { + case pmetric.NumberDataPointValueTypeDouble: + return dp.DoubleValue() + case pmetric.NumberDataPointValueTypeInt: + return float64(dp.IntValue()) + } + return 0 +} + +func intVal(dp pmetric.NumberDataPoint) int64 { + switch dp.ValueType() { + case pmetric.NumberDataPointValueTypeDouble: + return int64(dp.DoubleValue()) + case pmetric.NumberDataPointValueTypeInt: + return dp.IntValue() + } + return 0 +} + +func mergeHistogramDataPoints(dpsMap map[string]pmetric.HistogramDataPointSlice, to pmetric.HistogramDataPointSlice) { + for _, dps := range dpsMap { + dp := to.AppendEmpty() + dps.At(0).MoveTo(dp) + counts := dp.BucketCounts() + for i := 1; i < dps.Len(); i++ { + if dps.At(i).Count() == 0 { + continue + } + dp.SetCount(dp.Count() + dps.At(i).Count()) + dp.SetSum(dp.Sum() + dps.At(i).Sum()) + if dp.HasMin() && dp.Min() > dps.At(i).Min() { + dp.SetMin(dps.At(i).Min()) + } + if dp.HasMax() && dp.Max() < dps.At(i).Max() { + dp.SetMax(dps.At(i).Max()) + } + for b := 0; b < dps.At(i).BucketCounts().Len(); b++ { + counts.SetAt(b, counts.At(b)+dps.At(i).BucketCounts().At(b)) + } + dps.At(i).Exemplars().MoveAndAppendTo(dp.Exemplars()) + if dps.At(i).StartTimestamp() < dp.StartTimestamp() { + dp.SetStartTimestamp(dps.At(i).StartTimestamp()) + } + } + } +} + +func mergeExponentialHistogramDataPoints(dpsMap map[string]pmetric.ExponentialHistogramDataPointSlice, + to pmetric.ExponentialHistogramDataPointSlice) { + for _, dps := range dpsMap { + dp := to.AppendEmpty() + dps.At(0).MoveTo(dp) + negatives := dp.Negative().BucketCounts() + positives := dp.Positive().BucketCounts() + for i := 1; i < dps.Len(); i++ { + if dps.At(i).Count() == 0 { + continue + } + dp.SetCount(dp.Count() + dps.At(i).Count()) + dp.SetSum(dp.Sum() + dps.At(i).Sum()) + if dp.HasMin() && dp.Min() > dps.At(i).Min() { + dp.SetMin(dps.At(i).Min()) + } + if dp.HasMax() && dp.Max() < dps.At(i).Max() { + dp.SetMax(dps.At(i).Max()) + } + for b := 0; b < dps.At(i).Negative().BucketCounts().Len(); b++ { + negatives.SetAt(b, negatives.At(b)+dps.At(i).Negative().BucketCounts().At(b)) + } + for b := 0; b < dps.At(i).Positive().BucketCounts().Len(); b++ { + positives.SetAt(b, positives.At(b)+dps.At(i).Positive().BucketCounts().At(b)) + } + dps.At(i).Exemplars().MoveAndAppendTo(dp.Exemplars()) + if dps.At(i).StartTimestamp() < dp.StartTimestamp() { + dp.SetStartTimestamp(dps.At(i).StartTimestamp()) + } + } + } +} + +func groupNumberDataPoints(dps pmetric.NumberDataPointSlice, useStartTime bool, + dpsByAttrsAndTs map[string]pmetric.NumberDataPointSlice) { + var keyHashParts []any + for i := 0; i < dps.Len(); i++ { + if useStartTime { + keyHashParts = []any{dps.At(i).StartTimestamp().String()} + } + key := dataPointHashKey(dps.At(i).Attributes(), dps.At(i).Timestamp(), keyHashParts...) + if _, ok := dpsByAttrsAndTs[key]; !ok { + dpsByAttrsAndTs[key] = pmetric.NewNumberDataPointSlice() + } + dps.At(i).MoveTo(dpsByAttrsAndTs[key].AppendEmpty()) + } +} + +func groupHistogramDataPoints(dps pmetric.HistogramDataPointSlice, useStartTime bool, + dpsByAttrsAndTs map[string]pmetric.HistogramDataPointSlice) { + for i := 0; i < dps.Len(); i++ { + dp := dps.At(i) + keyHashParts := make([]any, 0, dp.ExplicitBounds().Len()+4) + for b := 0; b < dp.ExplicitBounds().Len(); b++ { + keyHashParts = append(keyHashParts, dp.ExplicitBounds().At(b)) + } + if useStartTime { + keyHashParts = append(keyHashParts, dp.StartTimestamp().String()) + } + + keyHashParts = append(keyHashParts, dp.HasMin(), dp.HasMax(), uint32(dp.Flags())) + key := dataPointHashKey(dps.At(i).Attributes(), dp.Timestamp(), keyHashParts...) + if _, ok := dpsByAttrsAndTs[key]; !ok { + dpsByAttrsAndTs[key] = pmetric.NewHistogramDataPointSlice() + } + dp.MoveTo(dpsByAttrsAndTs[key].AppendEmpty()) + } +} + +func groupExponentialHistogramDataPoints(dps pmetric.ExponentialHistogramDataPointSlice, useStartTime bool, + dpsByAttrsAndTs map[string]pmetric.ExponentialHistogramDataPointSlice) { + for i := 0; i < dps.Len(); i++ { + dp := dps.At(i) + keyHashParts := make([]any, 0, 5) + keyHashParts = append(keyHashParts, dp.Scale(), dp.HasMin(), dp.HasMax(), uint32(dp.Flags()), dp.Negative().Offset(), + dp.Positive().Offset()) + if useStartTime { + keyHashParts = append(keyHashParts, dp.StartTimestamp().String()) + } + key := dataPointHashKey(dps.At(i).Attributes(), dp.Timestamp(), keyHashParts...) + if _, ok := dpsByAttrsAndTs[key]; !ok { + dpsByAttrsAndTs[key] = pmetric.NewExponentialHistogramDataPointSlice() + } + dp.MoveTo(dpsByAttrsAndTs[key].AppendEmpty()) + } +} + +func dataPointHashKey(atts pcommon.Map, ts pcommon.Timestamp, other ...any) string { + hashParts := []any{atts.AsRaw(), ts.String()} + jsonStr, _ := json.Marshal(append(hashParts, other...)) + return string(jsonStr) +} diff --git a/internal/coreinternal/aggregateutil/aggregate_test.go b/internal/coreinternal/aggregateutil/aggregate_test.go new file mode 100644 index 000000000000..a1e986b988e2 --- /dev/null +++ b/internal/coreinternal/aggregateutil/aggregate_test.go @@ -0,0 +1,566 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package aggregateutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil" + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +func Test_CopyMetricDetails(t *testing.T) { + gaugeFunc := func() pmetric.Metric { + m := pmetric.NewMetric() + m.SetDescription("desc") + m.SetName("name") + m.SetUnit("unit") + m.SetEmptyGauge() + return m + } + + sumFunc := func() pmetric.Metric { + m := pmetric.NewMetric() + m.SetDescription("desc") + m.SetName("name") + m.SetUnit("unit") + s := m.SetEmptySum() + s.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + s.SetIsMonotonic(true) + return m + } + + summaryFunc := func() pmetric.Metric { + m := pmetric.NewMetric() + m.SetDescription("desc") + m.SetName("name") + m.SetUnit("unit") + m.SetEmptySummary() + return m + } + + histogramFunc := func() pmetric.Metric { + m := pmetric.NewMetric() + m.SetDescription("desc") + m.SetName("name") + m.SetUnit("unit") + m.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + return m + } + + expHistogramFunc := func() pmetric.Metric { + m := pmetric.NewMetric() + m.SetDescription("desc") + m.SetName("name") + m.SetUnit("unit") + m.SetEmptyExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + return m + } + tests := []struct { + name string + from func() pmetric.Metric + to func() pmetric.Metric + }{ + { + name: "gauge", + from: gaugeFunc, + to: gaugeFunc, + }, + { + name: "summary", + from: summaryFunc, + to: summaryFunc, + }, + { + name: "sum", + from: sumFunc, + to: sumFunc, + }, + { + name: "histogram", + from: histogramFunc, + to: histogramFunc, + }, + { + name: " exp histogram", + from: expHistogramFunc, + to: expHistogramFunc, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := pmetric.NewMetric() + from := tt.from() + to := tt.to() + CopyMetricDetails(from, result) + require.Equal(t, to, result) + }) + } +} + +func Test_FilterAttributes(t *testing.T) { + tests := []struct { + name string + attr []string + want func() pmetric.Metric + }{ + { + name: "nil", + attr: nil, + want: func() pmetric.Metric { + m := pmetric.NewMetric() + s := m.SetEmptySum() + d := s.DataPoints().AppendEmpty() + d.Attributes().PutStr("attr1", "val1") + d.Attributes().PutStr("attr2", "val2") + return m + }, + }, + { + name: "empty", + attr: []string{}, + want: func() pmetric.Metric { + m := pmetric.NewMetric() + s := m.SetEmptySum() + d := s.DataPoints().AppendEmpty() + d.Attributes().PutStr("attr1", "val1") + d.Attributes().PutStr("attr2", "val2") + return m + }, + }, + { + name: "valid", + attr: []string{"attr1"}, + want: func() pmetric.Metric { + m := pmetric.NewMetric() + s := m.SetEmptySum() + d := s.DataPoints().AppendEmpty() + d.Attributes().PutStr("attr1", "val1") + return m + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := pmetric.NewMetric() + s := m.SetEmptySum() + d := s.DataPoints().AppendEmpty() + d.Attributes().PutStr("attr1", "val1") + d.Attributes().PutStr("attr2", "val2") + + FilterAttrs(m, tt.attr) + require.Equal(t, tt.want(), m) + }) + } +} + +func Test_RangeDataPointAttributes(t *testing.T) { + fun := func(attrs pcommon.Map) bool { + attrs.RemoveIf(func(k string, _ pcommon.Value) bool { + return isNotPresent(k, []string{"attr1"}) + }) + return true + } + + tests := []struct { + name string + in func() pmetric.Metric + want func() pmetric.Metric + }{ + { + name: "sum", + in: func() pmetric.Metric { + m := pmetric.NewMetric() + s := m.SetEmptySum() + d := s.DataPoints().AppendEmpty() + d.Attributes().PutStr("attr1", "val1") + d.Attributes().PutStr("attr2", "val2") + return m + }, + want: func() pmetric.Metric { + m := pmetric.NewMetric() + s := m.SetEmptySum() + d := s.DataPoints().AppendEmpty() + d.Attributes().PutStr("attr1", "val1") + return m + }, + }, + { + name: "gauge", + in: func() pmetric.Metric { + m := pmetric.NewMetric() + s := m.SetEmptyGauge() + d := s.DataPoints().AppendEmpty() + d.Attributes().PutStr("attr1", "val1") + d.Attributes().PutStr("attr2", "val2") + return m + }, + want: func() pmetric.Metric { + m := pmetric.NewMetric() + s := m.SetEmptyGauge() + d := s.DataPoints().AppendEmpty() + d.Attributes().PutStr("attr1", "val1") + return m + }, + }, + { + name: "summary", + in: func() pmetric.Metric { + m := pmetric.NewMetric() + s := m.SetEmptySummary() + d := s.DataPoints().AppendEmpty() + d.Attributes().PutStr("attr1", "val1") + d.Attributes().PutStr("attr2", "val2") + return m + }, + want: func() pmetric.Metric { + m := pmetric.NewMetric() + s := m.SetEmptySummary() + d := s.DataPoints().AppendEmpty() + d.Attributes().PutStr("attr1", "val1") + return m + }, + }, + { + name: "histogram", + in: func() pmetric.Metric { + m := pmetric.NewMetric() + s := m.SetEmptyHistogram() + d := s.DataPoints().AppendEmpty() + d.Attributes().PutStr("attr1", "val1") + d.Attributes().PutStr("attr2", "val2") + return m + }, + want: func() pmetric.Metric { + m := pmetric.NewMetric() + s := m.SetEmptyHistogram() + d := s.DataPoints().AppendEmpty() + d.Attributes().PutStr("attr1", "val1") + return m + }, + }, + { + name: "exp histogram", + in: func() pmetric.Metric { + m := pmetric.NewMetric() + s := m.SetEmptyExponentialHistogram() + d := s.DataPoints().AppendEmpty() + d.Attributes().PutStr("attr1", "val1") + d.Attributes().PutStr("attr2", "val2") + return m + }, + want: func() pmetric.Metric { + m := pmetric.NewMetric() + s := m.SetEmptyExponentialHistogram() + d := s.DataPoints().AppendEmpty() + d.Attributes().PutStr("attr1", "val1") + return m + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := tt.in() + RangeDataPointAttributes(m, fun) + require.Equal(t, tt.want(), m) + }) + } +} + +func Test_GroupDataPoints(t *testing.T) { + mapAttr := pcommon.NewMap() + mapAttr.PutStr("attr1", "val1") + hash := dataPointHashKey(mapAttr, pcommon.NewTimestampFromTime(time.Time{})) + + hashHistogram := dataPointHashKey(mapAttr, pcommon.NewTimestampFromTime(time.Time{}), false, false, 0) + + hashExpHistogram := dataPointHashKey(mapAttr, pcommon.NewTimestampFromTime(time.Time{}), 0, false, false, 0, 0, 0) + + tests := []struct { + name string + in func() pmetric.Metric + aggGroup AggGroups + want AggGroups + }{ + { + name: "sum", + aggGroup: AggGroups{ + sum: map[string]pmetric.NumberDataPointSlice{ + hash: testDataNumber(), + }, + }, + in: func() pmetric.Metric { + m := pmetric.NewMetric() + s := m.SetEmptySum() + s.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + d := s.DataPoints().AppendEmpty() + d.SetTimestamp(pcommon.NewTimestampFromTime(time.Time{})) + d.Attributes().PutStr("attr1", "val1") + d.SetIntValue(5) + return m + }, + want: AggGroups{ + sum: map[string]pmetric.NumberDataPointSlice{ + hash: testDataNumberDouble(), + }, + }, + }, + { + name: "gauge", + aggGroup: AggGroups{ + gauge: map[string]pmetric.NumberDataPointSlice{ + hash: testDataNumber(), + }, + }, + in: func() pmetric.Metric { + m := pmetric.NewMetric() + s := m.SetEmptyGauge() + d := s.DataPoints().AppendEmpty() + d.SetTimestamp(pcommon.NewTimestampFromTime(time.Time{})) + d.Attributes().PutStr("attr1", "val1") + d.SetIntValue(5) + return m + }, + want: AggGroups{ + gauge: map[string]pmetric.NumberDataPointSlice{ + hash: testDataNumberDouble(), + }, + }, + }, + { + name: "histogram", + aggGroup: AggGroups{ + histogram: map[string]pmetric.HistogramDataPointSlice{ + hashHistogram: testDataHistogram(), + }, + }, + in: func() pmetric.Metric { + m := pmetric.NewMetric() + s := m.SetEmptyHistogram() + s.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + d := s.DataPoints().AppendEmpty() + d.SetTimestamp(pcommon.NewTimestampFromTime(time.Time{})) + d.Attributes().PutStr("attr1", "val1") + d.SetCount(1) + return m + }, + want: AggGroups{ + histogram: map[string]pmetric.HistogramDataPointSlice{ + hashHistogram: testDataHistogramDouble(), + }, + }, + }, + { + name: "exp histogram", + aggGroup: AggGroups{ + expHistogram: map[string]pmetric.ExponentialHistogramDataPointSlice{ + hashExpHistogram: testDataExpHistogram(), + }, + }, + in: func() pmetric.Metric { + m := pmetric.NewMetric() + s := m.SetEmptyExponentialHistogram() + s.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + d := s.DataPoints().AppendEmpty() + d.SetTimestamp(pcommon.NewTimestampFromTime(time.Time{})) + d.Attributes().PutStr("attr1", "val1") + d.SetCount(1) + return m + }, + want: AggGroups{ + expHistogram: map[string]pmetric.ExponentialHistogramDataPointSlice{ + hashExpHistogram: testDataExpHistogramDouble(), + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + a := tt.aggGroup + GroupDataPoints(tt.in(), &a) + require.Equal(t, tt.want, a) + }) + } +} + +func Test_MergeDataPoints(t *testing.T) { + mapAttr := pcommon.NewMap() + mapAttr.PutStr("attr1", "val1") + + hash := dataPointHashKey(mapAttr, pcommon.NewTimestampFromTime(time.Time{})) + + hashHistogram := dataPointHashKey(mapAttr, pcommon.NewTimestampFromTime(time.Time{}), false, false, 0) + + hashExpHistogram := dataPointHashKey(mapAttr, pcommon.NewTimestampFromTime(time.Time{}), 0, false, false, 0, 0, 0) + + tests := []struct { + name string + typ AggregationType + aggGroup AggGroups + want func() pmetric.Metric + in func() pmetric.Metric + }{ + { + name: "sum", + aggGroup: AggGroups{ + sum: map[string]pmetric.NumberDataPointSlice{ + hash: testDataNumberDouble(), + }, + }, + typ: Sum, + want: func() pmetric.Metric { + m := pmetric.NewMetric() + s := m.SetEmptySum() + s.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + d := s.DataPoints().AppendEmpty() + d.Attributes().PutStr("attr1", "val1") + d.SetIntValue(6) + return m + }, + in: func() pmetric.Metric { + m := pmetric.NewMetric() + s := m.SetEmptySum() + s.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + return m + }, + }, + { + name: "gauge", + aggGroup: AggGroups{ + gauge: map[string]pmetric.NumberDataPointSlice{ + hash: testDataNumberDouble(), + }, + }, + typ: Sum, + want: func() pmetric.Metric { + m := pmetric.NewMetric() + s := m.SetEmptyGauge() + d := s.DataPoints().AppendEmpty() + d.Attributes().PutStr("attr1", "val1") + d.SetIntValue(6) + return m + }, + in: func() pmetric.Metric { + m := pmetric.NewMetric() + m.SetEmptyGauge() + return m + }, + }, + { + name: "histogram", + aggGroup: AggGroups{ + histogram: map[string]pmetric.HistogramDataPointSlice{ + hashHistogram: testDataHistogramDouble(), + }, + }, + typ: Sum, + want: func() pmetric.Metric { + m := pmetric.NewMetric() + s := m.SetEmptyHistogram() + d := s.DataPoints().AppendEmpty() + d.Attributes().PutStr("attr1", "val1") + d.SetCount(3) + d.SetSum(0) + return m + }, + in: func() pmetric.Metric { + m := pmetric.NewMetric() + m.SetEmptyHistogram() + return m + }, + }, + { + name: "exp histogram", + aggGroup: AggGroups{ + expHistogram: map[string]pmetric.ExponentialHistogramDataPointSlice{ + hashExpHistogram: testDataExpHistogramDouble(), + }, + }, + typ: Sum, + want: func() pmetric.Metric { + m := pmetric.NewMetric() + s := m.SetEmptyExponentialHistogram() + d := s.DataPoints().AppendEmpty() + d.Attributes().PutStr("attr1", "val1") + d.SetCount(3) + d.SetSum(0) + return m + }, + in: func() pmetric.Metric { + m := pmetric.NewMetric() + m.SetEmptyExponentialHistogram() + return m + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := tt.in() + MergeDataPoints(m, tt.typ, tt.aggGroup) + require.Equal(t, tt.want(), m) + }) + } +} + +func testDataNumber() pmetric.NumberDataPointSlice { + data := pmetric.NewNumberDataPointSlice() + d := data.AppendEmpty() + d.Attributes().PutStr("attr1", "val1") + d.SetIntValue(1) + return data +} + +func testDataNumberDouble() pmetric.NumberDataPointSlice { + dataWant := pmetric.NewNumberDataPointSlice() + dWant := dataWant.AppendEmpty() + dWant.Attributes().PutStr("attr1", "val1") + dWant.SetIntValue(1) + dWant2 := dataWant.AppendEmpty() + dWant2.SetTimestamp(pcommon.NewTimestampFromTime(time.Time{})) + dWant2.Attributes().PutStr("attr1", "val1") + dWant2.SetIntValue(5) + return dataWant +} + +func testDataHistogram() pmetric.HistogramDataPointSlice { + data := pmetric.NewHistogramDataPointSlice() + d := data.AppendEmpty() + d.Attributes().PutStr("attr1", "val1") + d.SetCount(2) + return data +} + +func testDataHistogramDouble() pmetric.HistogramDataPointSlice { + dataWant := pmetric.NewHistogramDataPointSlice() + dWant := dataWant.AppendEmpty() + dWant.Attributes().PutStr("attr1", "val1") + dWant.SetCount(2) + dWant2 := dataWant.AppendEmpty() + dWant2.SetTimestamp(pcommon.NewTimestampFromTime(time.Time{})) + dWant2.Attributes().PutStr("attr1", "val1") + dWant2.SetCount(1) + return dataWant +} + +func testDataExpHistogram() pmetric.ExponentialHistogramDataPointSlice { + data := pmetric.NewExponentialHistogramDataPointSlice() + d := data.AppendEmpty() + d.Attributes().PutStr("attr1", "val1") + d.SetCount(2) + return data +} + +func testDataExpHistogramDouble() pmetric.ExponentialHistogramDataPointSlice { + dataWant := pmetric.NewExponentialHistogramDataPointSlice() + dWant := dataWant.AppendEmpty() + dWant.Attributes().PutStr("attr1", "val1") + dWant.SetCount(2) + dWant2 := dataWant.AppendEmpty() + dWant2.SetTimestamp(pcommon.NewTimestampFromTime(time.Time{})) + dWant2.Attributes().PutStr("attr1", "val1") + dWant2.SetCount(1) + return dataWant +} diff --git a/internal/coreinternal/aggregateutil/type.go b/internal/coreinternal/aggregateutil/type.go new file mode 100644 index 000000000000..1d6a34835874 --- /dev/null +++ b/internal/coreinternal/aggregateutil/type.go @@ -0,0 +1,60 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package aggregateutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil" + +import ( + "fmt" + + "go.opentelemetry.io/collector/pdata/pmetric" +) + +// AggregationType is the enum to capture the three types of aggregation for the aggregation operation. +type AggregationType string + +const ( + // Sum indicates taking the sum of the aggregated data. + Sum AggregationType = "sum" + + // Mean indicates taking the mean of the aggregated data. + Mean AggregationType = "mean" + + // Min indicates taking the minimum of the aggregated data. + Min AggregationType = "min" + + // Max indicates taking the max of the aggregated data. + Max AggregationType = "max" + + // Median indicates taking the median of the aggregated data. + Median AggregationType = "median" + + // Count indicates taking the count of the aggregated data. + Count AggregationType = "count" +) + +var AggregationTypes = []AggregationType{Sum, Mean, Min, Max, Count} + +func (at AggregationType) IsValid() bool { + for _, AggregationType := range AggregationTypes { + if at == AggregationType { + return true + } + } + + return false +} + +type AggGroups struct { + gauge map[string]pmetric.NumberDataPointSlice + sum map[string]pmetric.NumberDataPointSlice + histogram map[string]pmetric.HistogramDataPointSlice + expHistogram map[string]pmetric.ExponentialHistogramDataPointSlice +} + +func ConvertToAggregationType(str string) (AggregationType, error) { + a := AggregationType(str) + if a.IsValid() { + return a, nil + } + return a, fmt.Errorf("unsupported function: '%s'", str) +} diff --git a/internal/coreinternal/aggregateutil/type_test.go b/internal/coreinternal/aggregateutil/type_test.go new file mode 100644 index 000000000000..9ff63c4e7404 --- /dev/null +++ b/internal/coreinternal/aggregateutil/type_test.go @@ -0,0 +1,66 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package aggregateutil + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_AggregationType_IsValid(t *testing.T) { + tests := []struct { + name string + in AggregationType + want bool + }{ + { + name: "valid", + in: Mean, + want: true, + }, + + { + name: "invalid", + in: AggregationType("invalid"), + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.want, tt.in.IsValid()) + }) + } +} + +func Test_AggregationType_Convert(t *testing.T) { + tests := []struct { + name string + in string + want AggregationType + wantErr error + }{ + { + name: "valid", + in: "mean", + want: Mean, + wantErr: nil, + }, + + { + name: "invalid", + in: "invalid", + want: "invalid", + wantErr: fmt.Errorf("unsupported function: 'invalid'"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ConvertToAggregationType(tt.in) + require.Equal(t, tt.want, got) + require.Equal(t, tt.wantErr, err) + }) + } +}