Skip to content

Commit

Permalink
[pkg/ottl] Add support for scaling values (#33246)
Browse files Browse the repository at this point in the history
**Description:**

Adds a `Scale` function to the OTTL package. This function can be
applied to `int`/`double` values, as well as metrics of the following
types:

- Sum
- Gauge
- Histogram

**Link to tracking Issue:** #16214

**Testing:** Added Unit and E2E tests in the OTTL package. Tested
manually in a sample environment with the following example
configuration:

```
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317

processors:
  transform:
    error_mode: ignore
    metric_statements:
      - context: metric
        statements:
          - set(data_points, Scale(data_points, 10.0))

exporters:
  debug:
    verbosity: detailed
  otlphttp:
    endpoint: "######"
    headers:
      Authorization: "#####"

service:
  pipelines:
    metrics:
      receivers: [otlp]
      exporters: [otlphttp, debug]
      processors: [transform]
```

**Documentation:** Added documentation in the `README` describing all
functions in the `ottl` package

---------

Signed-off-by: Florian Bacher <florian.bacher@dynatrace.com>
Co-authored-by: Evan Bradley <11745660+evan-bradley@users.noreply.github.com>
Co-authored-by: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com>
  • Loading branch information
3 people authored Jul 25, 2024
1 parent 3cb736b commit 1f88246
Show file tree
Hide file tree
Showing 7 changed files with 394 additions and 0 deletions.
27 changes: 27 additions & 0 deletions .chloggen/add-scale-function.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/transform

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add `scale_metric` function that scales all data points in a metric.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [16214]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
16 changes: 16 additions & 0 deletions processor/transformprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ In addition to OTTL functions, the processor defines its own functions to help w
- [convert_summary_count_val_to_sum](#convert_summary_count_val_to_sum)
- [convert_summary_sum_val_to_sum](#convert_summary_sum_val_to_sum)
- [copy_metric](#copy_metric)
- [scale_metric](#scale_metric)

### convert_sum_to_gauge

Expand Down Expand Up @@ -347,6 +348,21 @@ Examples:

- `copy_metric(desc="new desc") where description == "old desc"`

### scale_metric

`scale_metric(factor, Optional[unit])`

The `scale_metric` function multiplies the values in the data points in the metric by the float value `factor`.
If the optional string `unit` is provided, the metric's unit will be set to this value.
The supported data types are:

Supported metric types are `Gauge`, `Sum`, `Histogram`, and `Summary`.

Examples:

- `scale_metric(0.1)`: Scale the metric by a factor of `0.1`. The unit of the metric will not be modified.
- `scale_metric(10.0, "kWh")`: Scale the metric by a factor of `10.0` and sets the unit to `kWh`.

## Examples

### Perform transformation if field does not exist
Expand Down
130 changes: 130 additions & 0 deletions processor/transformprocessor/internal/metrics/func_scale.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"

import (
"context"
"errors"
"fmt"

"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric"
)

type ScaleArguments struct {
Multiplier float64
Unit ottl.Optional[ottl.StringGetter[ottlmetric.TransformContext]]
}

func newScaleMetricFactory() ottl.Factory[ottlmetric.TransformContext] {
return ottl.NewFactory("scale_metric", &ScaleArguments{}, createScaleFunction)
}

func createScaleFunction(_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.ExprFunc[ottlmetric.TransformContext], error) {
args, ok := oArgs.(*ScaleArguments)

if !ok {
return nil, fmt.Errorf("ScaleFactory args must be of type *ScaleArguments[K]")
}

return Scale(*args)
}

func Scale(args ScaleArguments) (ottl.ExprFunc[ottlmetric.TransformContext], error) {
return func(ctx context.Context, tCtx ottlmetric.TransformContext) (any, error) {
metric := tCtx.GetMetric()

var unit *string
if !args.Unit.IsEmpty() {
u, err := args.Unit.Get().Get(ctx, tCtx)
if err != nil {
return nil, fmt.Errorf("could not get unit from ScaleArguments: %w", err)
}
unit = &u
}

switch metric.Type() {
case pmetric.MetricTypeGauge:
scaleMetric(metric.Gauge().DataPoints(), args.Multiplier)
case pmetric.MetricTypeHistogram:
scaleHistogram(metric.Histogram().DataPoints(), args.Multiplier)
case pmetric.MetricTypeSummary:
scaleSummarySlice(metric.Summary().DataPoints(), args.Multiplier)
case pmetric.MetricTypeSum:
scaleMetric(metric.Sum().DataPoints(), args.Multiplier)
case pmetric.MetricTypeExponentialHistogram:
return nil, errors.New("exponential histograms are not supported by the 'scale_metric' function")
default:
return nil, fmt.Errorf("unsupported metric type: '%v'", metric.Type())
}
if unit != nil {
metric.SetUnit(*unit)
}

return nil, nil
}, nil
}

func scaleExemplar(ex *pmetric.Exemplar, multiplier float64) {
switch ex.ValueType() {
case pmetric.ExemplarValueTypeInt:
ex.SetIntValue(int64(float64(ex.IntValue()) * multiplier))
case pmetric.ExemplarValueTypeDouble:
ex.SetDoubleValue(ex.DoubleValue() * multiplier)
}
}

func scaleSummarySlice(values pmetric.SummaryDataPointSlice, multiplier float64) {
for i := 0; i < values.Len(); i++ {
dp := values.At(i)

dp.SetSum(dp.Sum() * multiplier)

for i := 0; i < dp.QuantileValues().Len(); i++ {
qv := dp.QuantileValues().At(i)
qv.SetValue(qv.Value() * multiplier)
}
}
}

func scaleHistogram(datapoints pmetric.HistogramDataPointSlice, multiplier float64) {
for i := 0; i < datapoints.Len(); i++ {
dp := datapoints.At(i)

if dp.HasSum() {
dp.SetSum(dp.Sum() * multiplier)
}
if dp.HasMin() {
dp.SetMin(dp.Min() * multiplier)
}
if dp.HasMax() {
dp.SetMax(dp.Max() * multiplier)
}

for bounds, bi := dp.ExplicitBounds(), 0; bi < bounds.Len(); bi++ {
bounds.SetAt(bi, bounds.At(bi)*multiplier)
}

for exemplars, ei := dp.Exemplars(), 0; ei < exemplars.Len(); ei++ {
exemplar := exemplars.At(ei)
scaleExemplar(&exemplar, multiplier)
}
}
}

func scaleMetric(points pmetric.NumberDataPointSlice, multiplier float64) {
for i := 0; i < points.Len(); i++ {
dp := points.At(i)
switch dp.ValueType() {
case pmetric.NumberDataPointValueTypeInt:
dp.SetIntValue(int64(float64(dp.IntValue()) * multiplier))

case pmetric.NumberDataPointValueTypeDouble:
dp.SetDoubleValue(dp.DoubleValue() * multiplier)
default:
}
}
}
204 changes: 204 additions & 0 deletions processor/transformprocessor/internal/metrics/func_scale_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package metrics

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric"
)

func TestScale(t *testing.T) {
type testCase struct {
name string
args ScaleArguments
valueFunc func() pmetric.Metric
wantFunc func() pmetric.Metric
wantErr bool
}
tests := []testCase{
{
name: "scale gauge float metric",
valueFunc: func() pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName("test-metric")
metric.SetEmptyGauge()
metric.Gauge().DataPoints().AppendEmpty().SetDoubleValue(10.0)

return metric
},
args: ScaleArguments{
Multiplier: 10.0,
Unit: ottl.NewTestingOptional[ottl.StringGetter[ottlmetric.TransformContext]](ottl.StandardStringGetter[ottlmetric.TransformContext]{
Getter: func(_ context.Context, _ ottlmetric.TransformContext) (any, error) {
return "kWh", nil
},
}),
},
wantFunc: func() pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName("test-metric")
metric.SetEmptyGauge()
metric.SetUnit("kWh")
metric.Gauge().DataPoints().AppendEmpty().SetDoubleValue(100.0)

return metric
},
wantErr: false,
},
{
name: "scale gauge int metric",
valueFunc: func() pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName("test-metric")
metric.SetEmptyGauge()
metric.Gauge().DataPoints().AppendEmpty().SetIntValue(10)

return metric
},
args: ScaleArguments{
Multiplier: 10.0,
},
wantFunc: func() pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName("test-metric")
metric.SetEmptyGauge()
metric.Gauge().DataPoints().AppendEmpty().SetIntValue(100.0)

return metric
},
wantErr: false,
},
{
name: "scale sum metric",
valueFunc: func() pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName("test-metric")
metric.SetEmptySum()
metric.Sum().DataPoints().AppendEmpty().SetDoubleValue(10.0)

return metric
},
args: ScaleArguments{
Multiplier: 10.0,
},
wantFunc: func() pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName("test-metric")
metric.SetEmptySum()
metric.Sum().DataPoints().AppendEmpty().SetDoubleValue(100.0)

return metric
},
wantErr: false,
},
{
name: "scale histogram metric",
valueFunc: func() pmetric.Metric {
metric := getTestScalingHistogramMetric(1, 4, 1, 3, []float64{1, 10}, []uint64{1, 2}, []float64{1.0}, 1, 1)
return metric
},
args: ScaleArguments{
Multiplier: 10.0,
},
wantFunc: func() pmetric.Metric {
metric := getTestScalingHistogramMetric(1, 40, 10, 30, []float64{10, 100}, []uint64{1, 2}, []float64{10.0}, 1, 1)
return metric
},
wantErr: false,
},
{
name: "scale summary metric",
valueFunc: func() pmetric.Metric {
metric := pmetric.NewMetric()
dp := metric.SetEmptySummary().DataPoints().AppendEmpty()
dp.SetSum(10.0)
qv := dp.QuantileValues().AppendEmpty()
qv.SetValue(10.0)

return metric
},
args: ScaleArguments{
Multiplier: 10.0,
},
wantFunc: func() pmetric.Metric {
metric := pmetric.NewMetric()
dp := metric.SetEmptySummary().DataPoints().AppendEmpty()
dp.SetSum(100.0)
qv := dp.QuantileValues().AppendEmpty()
qv.SetValue(100.0)

return metric
},
wantErr: false,
},
{
name: "unsupported: exponential histogram metric",
valueFunc: func() pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetEmptyExponentialHistogram()
return metric
},
args: ScaleArguments{
Multiplier: 10.0,
},
wantFunc: func() pmetric.Metric {
// value should not be modified
metric := pmetric.NewMetric()
metric.SetEmptyExponentialHistogram()
return metric
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
target := ottlmetric.NewTransformContext(
tt.valueFunc(),
pmetric.NewMetricSlice(),
pcommon.NewInstrumentationScope(),
pcommon.NewResource(),
pmetric.NewScopeMetrics(),
pmetric.NewResourceMetrics(),
)

expressionFunc, _ := Scale(tt.args)
_, err := expressionFunc(context.Background(), target)

if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
assert.EqualValues(t, tt.wantFunc(), target.GetMetric())
})
}
}

func getTestScalingHistogramMetric(count uint64, sum, min, max float64, bounds []float64, bucketCounts []uint64, exemplars []float64, start, timestamp pcommon.Timestamp) pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName("test-metric")
metric.SetEmptyHistogram()
histogramDatapoint := metric.Histogram().DataPoints().AppendEmpty()
histogramDatapoint.SetCount(count)
histogramDatapoint.SetSum(sum)
histogramDatapoint.SetMin(min)
histogramDatapoint.SetMax(max)
histogramDatapoint.ExplicitBounds().FromRaw(bounds)
histogramDatapoint.BucketCounts().FromRaw(bucketCounts)
for i := 0; i < len(exemplars); i++ {
exemplar := histogramDatapoint.Exemplars().AppendEmpty()
exemplar.SetTimestamp(1)
exemplar.SetDoubleValue(exemplars[i])
}
histogramDatapoint.SetStartTimestamp(start)
histogramDatapoint.SetTimestamp(timestamp)
return metric
}
Loading

0 comments on commit 1f88246

Please sign in to comment.