Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/metricstransformprocessor]: Support median aggregation type #33655

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/aggregation-metricstransformprocessor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: metricstransformprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Adds the 'median' aggregation type to the Metrics Transform Processor. Also uses the refactored aggregation business logic from internal/core package."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [16224]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
8 changes: 6 additions & 2 deletions processor/metricstransformprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ processors:
# new_name specifies the updated name of the metric; if action is insert or combine, new_name is required
new_name: <new_metric_name_inserted>
# aggregation_type defines how combined data points will be aggregated; if action is combine, aggregation_type is required
aggregation_type: {sum, mean, min, max, count}
aggregation_type: {sum, mean, min, max, count, median}
# submatch_case specifies the case that should be used when adding label values based on regexp submatches when performing a combine action; leave blank to use the submatch value as is
submatch_case: {lower, upper}
# operations contain a list of operations that will be performed on the resulting metric(s)
Expand All @@ -106,7 +106,7 @@ processors:
# label_set contains a list of labels that will remain after aggregation; if action is aggregate_labels, label_set is required
label_set: [labels...]
# aggregation_type defines how data points will be aggregated; if action is aggregate_labels or aggregate_label_values, aggregation_type is required
aggregation_type: {sum, mean, min, max, count}
aggregation_type: {sum, mean, min, max, count, median}
# experimental_scale specifies the scalar to apply to values
experimental_scale: <scalar>
# value_actions contain a list of operations that will be performed on the selected label
Expand Down Expand Up @@ -273,6 +273,8 @@ operations:
aggregation_type: sum
```

**NOTE:** Only the `sum` aggregation function is supported for histogram and exponential histogram datatypes.

### Aggregate label values
```yaml
# aggregate data points with state label value slab_reclaimable & slab_unreclaimable using summation into slab
Expand All @@ -286,6 +288,8 @@ operations:
aggregation_type: sum
```

**NOTE:** Only the `sum` aggregation function is supported for histogram and exponential histogram datatypes.

### Combine metrics
```yaml
# convert a set of metrics for each http_method into a single metric with an http_method label, i.e.
Expand Down
38 changes: 4 additions & 34 deletions processor/metricstransformprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package metricstransformprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor"

import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil"

const (
// includeFieldName is the mapstructure field name for Include field
includeFieldName = "include"
Expand Down Expand Up @@ -75,7 +77,7 @@ type transform struct {

// AggregationType specifies how to aggregate.
// REQUIRED only if Action is COMBINE.
AggregationType aggregationType `mapstructure:"aggregation_type"`
AggregationType aggregateutil.AggregationType `mapstructure:"aggregation_type"`

// SubmatchCase specifies what case to use for label values created from regexp submatches.
SubmatchCase submatchCase `mapstructure:"submatch_case"`
Expand Down Expand Up @@ -112,7 +114,7 @@ type Operation struct {
LabelSet []string `mapstructure:"label_set"`

// AggregationType specifies how to aggregate.
AggregationType aggregationType `mapstructure:"aggregation_type"`
AggregationType aggregateutil.AggregationType `mapstructure:"aggregation_type"`

// AggregatedValues is a list of label values to aggregate away.
AggregatedValues []string `mapstructure:"aggregated_values"`
Expand Down Expand Up @@ -216,38 +218,6 @@ func (oa operationAction) isValid() bool {
return false
}

// aggregationType is the enum to capture the three types of aggregation for the aggregation operation.
type aggregationType string

const (
// sum indicates taking the sum of the aggregated data.
sum aggregationType = "sum"

// mean indicates taking the mean of the aggregated data.
mean aggregationType = "mean"

// min indicates taking the minimum of the aggregated data.
min aggregationType = "min"

// max indicates taking the max of the aggregated data.
max aggregationType = "max"

// count indicates taking the count of the aggregated data.
count aggregationType = "count"
)

var aggregationTypes = []aggregationType{sum, mean, min, max, count}

func (at aggregationType) isValid() bool {
for _, aggregationType := range aggregationTypes {
if at == aggregationType {
return true
}
}

return false
}

// matchType is the enum to capture the two types of matching metric(s) that should have operations applied to them.
type matchType string

Expand Down
9 changes: 5 additions & 4 deletions processor/metricstransformprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor/internal/metadata"
)

Expand Down Expand Up @@ -88,8 +89,8 @@ func validateConfiguration(config *Config) error {
return fmt.Errorf("missing required field %q while %q is %v", groupResourceLabelsFieldName, actionFieldName, Group)
}

if transform.AggregationType != "" && !transform.AggregationType.isValid() {
return fmt.Errorf("%q must be in %q", aggregationTypeFieldName, aggregationTypes)
if transform.AggregationType != "" && !transform.AggregationType.IsValid() {
return fmt.Errorf("%q must be in %q", aggregationTypeFieldName, aggregateutil.AggregationTypes)
}

if transform.SubmatchCase != "" && !transform.SubmatchCase.isValid() {
Expand All @@ -114,8 +115,8 @@ func validateConfiguration(config *Config) error {
return fmt.Errorf("operation %v: missing required field %q while %q is %v", i+1, scaleFieldName, actionFieldName, scaleValue)
}

if op.AggregationType != "" && !op.AggregationType.isValid() {
return fmt.Errorf("operation %v: %q must be in %q", i+1, aggregationTypeFieldName, aggregationTypes)
if op.AggregationType != "" && !op.AggregationType.IsValid() {
return fmt.Errorf("operation %v: %q must be in %q", i+1, aggregationTypeFieldName, aggregateutil.AggregationTypes)
}
}
}
Expand Down
13 changes: 7 additions & 6 deletions processor/metricstransformprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/processor/processortest"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor/internal/metadata"
)

Expand Down Expand Up @@ -87,7 +88,7 @@ func TestCreateProcessors(t *testing.T) {
{
configName: "config_invalid_aggregationtype.yaml",
succeed: false,
errorMessage: fmt.Sprintf("%q must be in %q", aggregationTypeFieldName, aggregationTypes),
errorMessage: fmt.Sprintf("%q must be in %q", aggregationTypeFieldName, aggregateutil.AggregationTypes),
},
{
configName: "config_invalid_operation_action.yaml",
Expand All @@ -97,7 +98,7 @@ func TestCreateProcessors(t *testing.T) {
{
configName: "config_invalid_operation_aggregationtype.yaml",
succeed: false,
errorMessage: fmt.Sprintf("operation %v: %q must be in %q", 1, aggregationTypeFieldName, aggregationTypes),
errorMessage: fmt.Sprintf("operation %v: %q must be in %q", 1, aggregationTypeFieldName, aggregateutil.AggregationTypes),
},
{
configName: "config_invalid_submatchcase.yaml",
Expand Down Expand Up @@ -221,14 +222,14 @@ func TestCreateProcessorsFilledData(t *testing.T) {
{
Action: aggregateLabels,
LabelSet: []string{"label1", "label2"},
AggregationType: sum,
AggregationType: aggregateutil.Sum,
},
{
Action: aggregateLabelValues,
Label: "label",
AggregatedValues: []string{"value1", "value2"},
NewValue: "new-value",
AggregationType: sum,
AggregationType: aggregateutil.Sum,
},
},
},
Expand Down Expand Up @@ -265,7 +266,7 @@ func TestCreateProcessorsFilledData(t *testing.T) {
configOperation: Operation{
Action: aggregateLabels,
LabelSet: []string{"label1", "label2"},
AggregationType: sum,
AggregationType: aggregateutil.Sum,
},
labelSetMap: map[string]bool{
"label1": true,
Expand All @@ -278,7 +279,7 @@ func TestCreateProcessorsFilledData(t *testing.T) {
Label: "label",
AggregatedValues: []string{"value1", "value2"},
NewValue: "new-value",
AggregationType: sum,
AggregationType: aggregateutil.Sum,
},
aggregatedValuesSet: map[string]bool{
"value1": true,
Expand Down
3 changes: 3 additions & 0 deletions processor/metricstransformprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/metri
go 1.21.0

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.105.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.105.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.105.0
github.com/stretchr/testify v1.9.0
Expand Down Expand Up @@ -73,3 +74,5 @@ retract (
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil"
)

type metricsTransformProcessor struct {
Expand All @@ -23,7 +25,7 @@ type internalTransform struct {
Action ConfigAction
NewName string
GroupResourceLabels map[string]string
AggregationType aggregationType
AggregationType aggregateutil.AggregationType
SubmatchCase submatchCase
Operations []internalOperation
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil"
)

// extractAndRemoveMatchedMetrics extracts matched metrics from ms metric slice and returns a new slice.
Expand Down Expand Up @@ -440,6 +442,17 @@ func combine(transform internalTransform, metrics pmetric.MetricSlice) pmetric.M
return combinedMetric
}

// groupMetrics groups all the provided timeseries that will be aggregated together based on all the label values.
// Returns a map of grouped timeseries and the corresponding selected labels
// canBeCombined must be callled before.
func groupMetrics(metrics pmetric.MetricSlice, aggType aggregateutil.AggregationType, to pmetric.Metric) {
ag := aggregateutil.AggGroups{}
for i := 0; i < metrics.Len(); i++ {
aggregateutil.GroupDataPoints(metrics.At(i), &ag)
}
aggregateutil.MergeDataPoints(to, aggType, ag)
}

func copyMetricDetails(from, to pmetric.Metric) {
to.SetName(from.Name())
to.SetUnit(from.Unit())
Expand Down Expand Up @@ -541,7 +554,13 @@ func transformMetric(metric pmetric.Metric, transform internalTransform) bool {
updateLabelOp(metric, op, transform.MetricIncludeFilter)
case aggregateLabels:
if canChangeMetric {
aggregateLabelsOp(metric, op)
attrs := []string{}
for k, v := range op.labelSetMap {
if v {
attrs = append(attrs, k)
}
}
aggregateLabelsOp(metric, attrs, op.configOperation.AggregationType)
}
case aggregateLabelValues:
if canChangeMetric {
Expand Down
Loading