Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests for malformed selectors in readers #4350

Merged
merged 9 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Fix `resource.WithHostID()` to not set an empty `host.id`. (#4317)
- Use the instrument identifying fields to cache aggregators and determine duplicate instrument registrations in `go.opentelemetry.io/otel/sdk/metric`. (#4337)
- Detect duplicate instruments for case-insensitive names in `go.opentelemetry.io/otel/sdk/metric`. (#4338)
- The `ManualReader` will not panic if `AggregationSelector` returns `nil`. (#4350)
- If a Reader's AggregationSelector return nil or DefaultAggregation the pipeline will use the default aggregation. (#4350)
pellared marked this conversation as resolved.
Show resolved Hide resolved
- Log a suggested view that fixes instrument conflicts in `go.opentelemetry.io/otel/sdk/metric`. (#4349)
- Fix possible panic, deadlock and race condition in batch span processor in `go.opentelemetry.io/otel/sdk/trace`. (#4353)

Expand Down
3 changes: 3 additions & 0 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ func WithAggregationSelector(selector AggregationSelector) ManualReaderOption {
// Deep copy and validate before using.
wrapped := func(ik InstrumentKind) aggregation.Aggregation {
a := selector(ik)
if a == nil {
return nil
}
cpA := a.Copy()
if err := cpA.Err(); err != nil {
cpA = DefaultAggregationSelector(ik)
Expand Down
146 changes: 146 additions & 0 deletions sdk/metric/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
Expand Down Expand Up @@ -1811,3 +1812,148 @@ func BenchmarkInstrumentCreation(b *testing.B) {
sfHistogram, _ = meter.Float64Histogram("sync.float64.histogram")
}
}

func testNilAggregationSelector(InstrumentKind) aggregation.Aggregation {
return nil
}
func testDefaultAggregationSelector(InstrumentKind) aggregation.Aggregation {
return aggregation.Default{}
}
func testUndefinedTemporalitySelector(InstrumentKind) metricdata.Temporality {
return metricdata.Temporality(0)
}
func testInvalidTemporalitySelector(InstrumentKind) metricdata.Temporality {
return metricdata.Temporality(255)
}

type noErrorHandler struct {
t *testing.T
}

func (h noErrorHandler) Handle(err error) {
assert.NoError(h.t, err)
}

func TestMalformedSelectors(t *testing.T) {
type testCase struct {
name string
reader Reader
}
testCases := []testCase{
{
name: "nil aggregation selector",
reader: NewManualReader(WithAggregationSelector(testNilAggregationSelector)),
},
{
name: "nil aggregation selector periodic",
reader: NewPeriodicReader(&fnExporter{aggregationFunc: testNilAggregationSelector}),
},
{
name: "default aggregation selector",
reader: NewManualReader(WithAggregationSelector(testDefaultAggregationSelector)),
},
{
name: "default aggregation selector periodic",
reader: NewPeriodicReader(&fnExporter{aggregationFunc: testDefaultAggregationSelector}),
},
{
name: "undefined temporality selector",
reader: NewManualReader(WithTemporalitySelector(testUndefinedTemporalitySelector)),
},
{
name: "undefined temporality selector periodic",
reader: NewPeriodicReader(&fnExporter{temporalityFunc: testUndefinedTemporalitySelector}),
},
{
name: "invalid temporality selector",
reader: NewManualReader(WithTemporalitySelector(testInvalidTemporalitySelector)),
},
{
name: "invalid temporality selector periodic",
reader: NewPeriodicReader(&fnExporter{temporalityFunc: testInvalidTemporalitySelector}),
},
{
name: "both aggregation and temporality selector",
reader: NewManualReader(
WithAggregationSelector(testNilAggregationSelector),
WithTemporalitySelector(testUndefinedTemporalitySelector),
),
},
{
name: "both aggregation and temporality selector periodic",
reader: NewPeriodicReader(&fnExporter{
aggregationFunc: testNilAggregationSelector,
temporalityFunc: testUndefinedTemporalitySelector,
}),
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
origErrorHandler := global.GetErrorHandler()
defer global.SetErrorHandler(origErrorHandler)
global.SetErrorHandler(noErrorHandler{t})

defer func() {
_ = tt.reader.Shutdown(context.Background())
}()

meter := NewMeterProvider(WithReader(tt.reader)).Meter("TestNilAggregationSelector")

// Create All instruments, they should not error
aiCounter, err := meter.Int64ObservableCounter("observable.int64.counter")
require.NoError(t, err)
aiUpDownCounter, err := meter.Int64ObservableUpDownCounter("observable.int64.up.down.counter")
require.NoError(t, err)
aiGauge, err := meter.Int64ObservableGauge("observable.int64.gauge")
require.NoError(t, err)

afCounter, err := meter.Float64ObservableCounter("observable.float64.counter")
require.NoError(t, err)
afUpDownCounter, err := meter.Float64ObservableUpDownCounter("observable.float64.up.down.counter")
require.NoError(t, err)
afGauge, err := meter.Float64ObservableGauge("observable.float64.gauge")
require.NoError(t, err)

siCounter, err := meter.Int64Counter("sync.int64.counter")
require.NoError(t, err)
siUpDownCounter, err := meter.Int64UpDownCounter("sync.int64.up.down.counter")
require.NoError(t, err)
siHistogram, err := meter.Int64Histogram("sync.int64.histogram")
require.NoError(t, err)

sfCounter, err := meter.Float64Counter("sync.float64.counter")
require.NoError(t, err)
sfUpDownCounter, err := meter.Float64UpDownCounter("sync.float64.up.down.counter")
require.NoError(t, err)
sfHistogram, err := meter.Float64Histogram("sync.float64.histogram")
require.NoError(t, err)

callback := func(ctx context.Context, obs metric.Observer) error {
obs.ObserveInt64(aiCounter, 1)
obs.ObserveInt64(aiUpDownCounter, 1)
obs.ObserveInt64(aiGauge, 1)
obs.ObserveFloat64(afCounter, 1)
obs.ObserveFloat64(afUpDownCounter, 1)
obs.ObserveFloat64(afGauge, 1)
return nil
}
_, err = meter.RegisterCallback(callback, aiCounter, aiUpDownCounter, aiGauge, afCounter, afUpDownCounter, afGauge)
require.NoError(t, err)

siCounter.Add(context.Background(), 1)
siUpDownCounter.Add(context.Background(), 1)
siHistogram.Record(context.Background(), 1)
sfCounter.Add(context.Background(), 1)
sfUpDownCounter.Add(context.Background(), 1)
sfHistogram.Record(context.Background(), 1)

var rm metricdata.ResourceMetrics
err = tt.reader.Collect(context.Background(), &rm)
require.NoError(t, err)

require.Len(t, rm.ScopeMetrics, 1)
require.Len(t, rm.ScopeMetrics[0].Metrics, 12)
})
}
}
5 changes: 5 additions & 0 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,11 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
case nil, aggregation.Default:
// Undefined, nil, means to use the default from the reader.
stream.Aggregation = i.pipeline.reader.aggregation(kind)
switch stream.Aggregation.(type) {
case nil, aggregation.Default:
// If the reader returns default or nil use the default selector.
stream.Aggregation = DefaultAggregationSelector(kind)
}
}

if err := isAggregatorCompatible(kind, stream.Aggregation); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions sdk/metric/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ func DefaultTemporalitySelector(InstrumentKind) metricdata.Temporality {

// AggregationSelector selects the aggregation and the parameters to use for
// that aggregation based on the InstrumentKind.
//
// If the Aggregation returned is nil or DefaultAggregation, the selection from
// DefaultAggregationSelector will be used.
type AggregationSelector func(InstrumentKind) aggregation.Aggregation

// DefaultAggregationSelector returns the default aggregation and parameters
Expand Down