Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a filter Aggregator. #3040

68 changes: 68 additions & 0 deletions sdk/metric/internal/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"sync"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// filter is an aggregator that applies attribute filter when Aggregating. filters
// do not have any backing memory, and must be constructed with a backing Aggregator.
type filter[N int64 | float64] struct {
filter func(attribute.Set) attribute.Set
aggregator Aggregator[N]

sync.Mutex
seen map[attribute.Set]attribute.Set
}

// NewFilter wraps an Aggregator with an attribute filtering function.
func NewFilter[N int64 | float64](agg Aggregator[N], fn func(attribute.Set) attribute.Set) Aggregator[N] {
if fn == nil {
return agg
}
return &filter[N]{
filter: fn,
aggregator: agg,
seen: map[attribute.Set]attribute.Set{},
}
}

// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.

// TODO (#3006): drop stale attributes from seen.
func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) {
f.Lock()
MadVikingGod marked this conversation as resolved.
Show resolved Hide resolved
defer f.Unlock()
fAttr, ok := f.seen[attr]
if !ok {
fAttr = f.filter(attr)
f.seen[attr] = fAttr
}
f.aggregator.Aggregate(measurement, fAttr)
MadVikingGod marked this conversation as resolved.
Show resolved Hide resolved
}

// Aggregation returns an Aggregation, for all the aggregated
// measurements made and ends an aggregation cycle.
func (f *filter[N]) Aggregation() metricdata.Aggregation {
return f.aggregator.Aggregation()
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
}
202 changes: 202 additions & 0 deletions sdk/metric/internal/filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// This is an aggregator that has a stable output, used for testing. It does not
// follow any spec prescribed aggregation.
type testStableAggregator[N int64 | float64] struct {
sync.Mutex
values []metricdata.DataPoint[N]
}

// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
func (a *testStableAggregator[N]) Aggregate(measurement N, attr attribute.Set) {
a.Lock()
defer a.Unlock()

a.values = append(a.values, metricdata.DataPoint[N]{
Attributes: attr,
Value: measurement,
})
}

// Aggregation returns an Aggregation, for all the aggregated
// measurements made and ends an aggregation cycle.
func (a *testStableAggregator[N]) Aggregation() metricdata.Aggregation {
return metricdata.Gauge[N]{
DataPoints: a.values,
}
}

func testNewFilterNoFilter[N int64 | float64](t *testing.T, agg Aggregator[N]) {
filter := NewFilter(agg, nil)
assert.Equal(t, agg, filter)
}

func testNewFilter[N int64 | float64](t *testing.T, agg Aggregator[N]) {
f := NewFilter(agg, testAttributeFilter)
require.IsType(t, &filter[N]{}, f)
filt := f.(*filter[N])
assert.Equal(t, agg, filt.aggregator)
}

func testAttributeFilter(input attribute.Set) attribute.Set {
out, _ := input.Filter(func(kv attribute.KeyValue) bool {
return kv.Key == "power-level"
})
return out
}

func TestNewFilter(t *testing.T) {
t.Run("int64", func(t *testing.T) {
agg := &testStableAggregator[int64]{}
testNewFilterNoFilter[int64](t, agg)
testNewFilter[int64](t, agg)
})
t.Run("float64", func(t *testing.T) {
agg := &testStableAggregator[float64]{}
testNewFilterNoFilter[float64](t, agg)
testNewFilter[float64](t, agg)
})
}

func testDataPoint[N int64 | float64](attr attribute.Set) metricdata.DataPoint[N] {
return metricdata.DataPoint[N]{
Attributes: attr,
Value: 1,
}
}

func testFilterAggregate[N int64 | float64](t *testing.T) {
testCases := []struct {
name string
inputAttr []attribute.Set
output []metricdata.DataPoint[N]
}{
{
name: "Will filter all out",
inputAttr: []attribute.Set{
attribute.NewSet(
attribute.String("foo", "bar"),
attribute.Float64("lifeUniverseEverything", 42.0),
),
},
output: []metricdata.DataPoint[N]{
testDataPoint[N](*attribute.EmptySet()),
},
},
{
name: "Will keep appropriate attributes",
inputAttr: []attribute.Set{
attribute.NewSet(
attribute.String("foo", "bar"),
attribute.Int("power-level", 9001),
attribute.Float64("lifeUniverseEverything", 42.0),
),
attribute.NewSet(
attribute.String("foo", "bar"),
attribute.Int("power-level", 9001),
),
},
output: []metricdata.DataPoint[N]{
// A real Aggregator will combine these, the testAggregator doesn't for list stability.
testDataPoint[N](attribute.NewSet(attribute.Int("power-level", 9001))),
testDataPoint[N](attribute.NewSet(attribute.Int("power-level", 9001))),
},
},
{
name: "Will combine Aggregations",
inputAttr: []attribute.Set{
attribute.NewSet(
attribute.String("foo", "bar"),
),
attribute.NewSet(
attribute.Float64("lifeUniverseEverything", 42.0),
),
},
output: []metricdata.DataPoint[N]{
// A real Aggregator will combine these, the testAggregator doesn't for list stability.
testDataPoint[N](*attribute.EmptySet()),
testDataPoint[N](*attribute.EmptySet()),
},
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
f := NewFilter[N](&testStableAggregator[N]{}, testAttributeFilter)
for _, set := range tt.inputAttr {
f.Aggregate(1, set)
}
out := f.Aggregation().(metricdata.Gauge[N])
assert.Equal(t, tt.output, out.DataPoints)
})
}
}

func TestFilterAggregate(t *testing.T) {
t.Run("int64", func(t *testing.T) {
testFilterAggregate[int64](t)
})
t.Run("float64", func(t *testing.T) {
testFilterAggregate[float64](t)
})
}

func testFilterConcurrent[N int64 | float64](t *testing.T) {
f := NewFilter[N](&testStableAggregator[N]{}, testAttributeFilter)
wg := &sync.WaitGroup{}
wg.Add(2)

go func() {
f.Aggregate(1, attribute.NewSet(
attribute.String("foo", "bar"),
))
wg.Done()
}()

go func() {
f.Aggregate(1, attribute.NewSet(
attribute.Int("power-level", 9001),
))
wg.Done()
}()

wg.Wait()
}

func TestFilterConcurrent(t *testing.T) {
t.Run("int64", func(t *testing.T) {
testFilterConcurrent[int64](t)
})
t.Run("float64", func(t *testing.T) {
testFilterConcurrent[float64](t)
})
}