Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restrict Meters to only register and collect instruments it created #4333

Merged
merged 8 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
This change is made to ensure compatibility with the OpenTelemetry specification. (#4288)
- If an attribute set is omitted from an async callback, the previous value will no longer be exported. (#4290)
- Allow the explicit bucket histogram aggregation to be used for the up-down counter, observable counter, observable up-down counter, and observable gauge in the `go.opentelemetry.io/otel/sdk/metric` package. (#4332)
- Restrict `Meter`s in `go.opentelemetry.io/otel/sdk/metric` to only register and collect instruments it created. (#4333)

### Fixed

Expand Down
20 changes: 11 additions & 9 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,9 @@ var _ metric.Float64ObservableCounter = float64Observable{}
var _ metric.Float64ObservableUpDownCounter = float64Observable{}
var _ metric.Float64ObservableGauge = float64Observable{}

func newFloat64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[float64]) float64Observable {
func newFloat64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[float64]) float64Observable {
return float64Observable{
observable: newObservable(scope, kind, name, desc, u, meas),
observable: newObservable(m, kind, name, desc, u, meas),
}
}

Expand All @@ -296,28 +296,30 @@ var _ metric.Int64ObservableCounter = int64Observable{}
var _ metric.Int64ObservableUpDownCounter = int64Observable{}
var _ metric.Int64ObservableGauge = int64Observable{}

func newInt64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[int64]) int64Observable {
func newInt64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[int64]) int64Observable {
return int64Observable{
observable: newObservable(scope, kind, name, desc, u, meas),
observable: newObservable(m, kind, name, desc, u, meas),
}
}

type observable[N int64 | float64] struct {
metric.Observable
observablID[N]

meter *meter
measures []aggregate.Measure[N]
}

func newObservable[N int64 | float64](scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[N]) *observable[N] {
func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[N]) *observable[N] {
return &observable[N]{
observablID: observablID[N]{
name: name,
description: desc,
kind: kind,
unit: u,
scope: scope,
scope: m.scope,
},
meter: m,
measures: meas,
}
}
Expand All @@ -335,16 +337,16 @@ var errEmptyAgg = errors.New("no aggregators for observable instrument")
// and nil if it should. An errEmptyAgg error is returned if o is effectively a
// no-op because it does not have any aggregators. Also, an error is returned
// if scope defines a Meter other than the one o was created by.
func (o *observable[N]) registerable(scope instrumentation.Scope) error {
func (o *observable[N]) registerable(m *meter) error {
if len(o.measures) == 0 {
return errEmptyAgg
}
if scope != o.scope {
if m != o.meter {
return fmt.Errorf(
"invalid registration: observable %q from Meter %q, registered with Meter %q",
o.name,
o.scope.Name,
scope.Name,
m.scope.Name,
)
}
return nil
Expand Down
106 changes: 46 additions & 60 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type meter struct {
scope instrumentation.Scope
pipes pipelines

int64IP *int64InstProvider
float64IP *float64InstProvider
int64Resolver resolver[int64]
float64Resolver resolver[float64]
}

func newMeter(s instrumentation.Scope, p pipelines) *meter {
Expand All @@ -52,10 +52,10 @@ func newMeter(s instrumentation.Scope, p pipelines) *meter {
var viewCache cache[string, streamID]

return &meter{
scope: s,
pipes: p,
int64IP: newInt64InstProvider(s, p, &viewCache),
float64IP: newFloat64InstProvider(s, p, &viewCache),
scope: s,
pipes: p,
int64Resolver: newResolver[int64](p, &viewCache),
float64Resolver: newResolver[float64](p, &viewCache),
}
}

Expand All @@ -68,7 +68,8 @@ var _ metric.Meter = (*meter)(nil)
func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption) (metric.Int64Counter, error) {
cfg := metric.NewInt64CounterConfig(options...)
const kind = InstrumentKindCounter
i, err := m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
p := int64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return i, err
}
Expand All @@ -82,7 +83,8 @@ func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption)
func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCounterOption) (metric.Int64UpDownCounter, error) {
cfg := metric.NewInt64UpDownCounterConfig(options...)
const kind = InstrumentKindUpDownCounter
i, err := m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
p := int64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return i, err
}
Expand All @@ -96,7 +98,8 @@ func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCou
func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOption) (metric.Int64Histogram, error) {
cfg := metric.NewInt64HistogramConfig(options...)
const kind = InstrumentKindHistogram
i, err := m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
p := int64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return i, err
}
Expand All @@ -111,7 +114,7 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti
func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) {
cfg := metric.NewInt64ObservableCounterConfig(options...)
const kind = InstrumentKindObservableCounter
p := int64ObservProvider{m.int64IP}
p := int64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
Expand All @@ -127,7 +130,7 @@ func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64Obser
func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) {
cfg := metric.NewInt64ObservableUpDownCounterConfig(options...)
const kind = InstrumentKindObservableUpDownCounter
p := int64ObservProvider{m.int64IP}
p := int64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
Expand All @@ -143,7 +146,7 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int6
func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) {
cfg := metric.NewInt64ObservableGaugeConfig(options...)
const kind = InstrumentKindObservableGauge
p := int64ObservProvider{m.int64IP}
p := int64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
Expand All @@ -158,7 +161,8 @@ func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64Observa
func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOption) (metric.Float64Counter, error) {
cfg := metric.NewFloat64CounterConfig(options...)
const kind = InstrumentKindCounter
i, err := m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
p := float64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return i, err
}
Expand All @@ -172,7 +176,8 @@ func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOpti
func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDownCounterOption) (metric.Float64UpDownCounter, error) {
cfg := metric.NewFloat64UpDownCounterConfig(options...)
const kind = InstrumentKindUpDownCounter
i, err := m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
p := float64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return i, err
}
Expand All @@ -186,7 +191,8 @@ func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDow
func (m *meter) Float64Histogram(name string, options ...metric.Float64HistogramOption) (metric.Float64Histogram, error) {
cfg := metric.NewFloat64HistogramConfig(options...)
const kind = InstrumentKindHistogram
i, err := m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
p := float64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return i, err
}
Expand All @@ -201,7 +207,7 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram
func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) {
cfg := metric.NewFloat64ObservableCounterConfig(options...)
const kind = InstrumentKindObservableCounter
p := float64ObservProvider{m.float64IP}
p := float64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
Expand All @@ -217,7 +223,7 @@ func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64O
func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) {
cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...)
const kind = InstrumentKindObservableUpDownCounter
p := float64ObservProvider{m.float64IP}
p := float64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
Expand All @@ -233,7 +239,7 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Fl
func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) {
cfg := metric.NewFloat64ObservableGaugeConfig(options...)
const kind = InstrumentKindObservableGauge
p := float64ObservProvider{m.float64IP}
p := float64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
Expand Down Expand Up @@ -301,15 +307,15 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)

switch o := inst.(type) {
case int64Observable:
if err := o.registerable(m.scope); err != nil {
if err := o.registerable(m); err != nil {
if !errors.Is(err, errEmptyAgg) {
errs.append(err)
}
continue
}
reg.registerInt64(o.observablID)
case float64Observable:
if err := o.registerable(m.scope); err != nil {
if err := o.registerable(m); err != nil {
if !errors.Is(err, errEmptyAgg) {
errs.append(err)
}
Expand All @@ -322,19 +328,15 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)
}
}

if err := errs.errorOrNil(); err != nil {
return nil, err
}

err := errs.errorOrNil()
if reg.len() == 0 {
// All insts use drop aggregation.
return noopRegister{}, nil
// All insts use drop aggregation or are invalid.
return noopRegister{}, err
}

cback := func(ctx context.Context) error {
return f(ctx, reg)
}
return m.pipes.registerMultiCallback(cback), nil
// Some or all instruments were valid.
cback := func(ctx context.Context) error { return f(ctx, reg) }
return m.pipes.registerMultiCallback(cback), err
}

type observer struct {
Expand Down Expand Up @@ -441,66 +443,50 @@ func (noopRegister) Unregister() error {
}

// int64InstProvider provides int64 OpenTelemetry instruments.
type int64InstProvider struct {
scope instrumentation.Scope
pipes pipelines
resolve resolver[int64]
}
type int64InstProvider struct{ *meter }

func newInt64InstProvider(s instrumentation.Scope, p pipelines, c *cache[string, streamID]) *int64InstProvider {
return &int64InstProvider{scope: s, pipes: p, resolve: newResolver[int64](p, c)}
}

func (p *int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[int64], error) {
func (p int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[int64], error) {
inst := Instrument{
Name: name,
Description: desc,
Unit: u,
Kind: kind,
Scope: p.scope,
}
return p.resolve.Aggregators(inst)
return p.int64Resolver.Aggregators(inst)
}

// lookup returns the resolved instrumentImpl.
func (p *int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) {
func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &int64Inst{measures: aggs}, err
}

// float64InstProvider provides float64 OpenTelemetry instruments.
type float64InstProvider struct {
scope instrumentation.Scope
pipes pipelines
resolve resolver[float64]
}

func newFloat64InstProvider(s instrumentation.Scope, p pipelines, c *cache[string, streamID]) *float64InstProvider {
return &float64InstProvider{scope: s, pipes: p, resolve: newResolver[float64](p, c)}
}
type float64InstProvider struct{ *meter }

func (p *float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[float64], error) {
func (p float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[float64], error) {
inst := Instrument{
Name: name,
Description: desc,
Unit: u,
Kind: kind,
Scope: p.scope,
}
return p.resolve.Aggregators(inst)
return p.float64Resolver.Aggregators(inst)
}

// lookup returns the resolved instrumentImpl.
func (p *float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) {
func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &float64Inst{measures: aggs}, err
}

type int64ObservProvider struct{ *int64InstProvider }
type int64ObservProvider struct{ *meter }

func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (int64Observable, error) {
aggs, err := p.aggs(kind, name, desc, u)
return newInt64Observable(p.scope, kind, name, desc, u, aggs), err
aggs, err := (int64InstProvider)(p).aggs(kind, name, desc, u)
return newInt64Observable(p.meter, kind, name, desc, u, aggs), err
}

func (p int64ObservProvider) registerCallbacks(inst int64Observable, cBacks []metric.Int64Callback) {
Expand Down Expand Up @@ -529,11 +515,11 @@ func (o int64Observer) Observe(val int64, opts ...metric.ObserveOption) {
o.observe(val, c.Attributes())
}

type float64ObservProvider struct{ *float64InstProvider }
type float64ObservProvider struct{ *meter }

func (p float64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (float64Observable, error) {
aggs, err := p.aggs(kind, name, desc, u)
return newFloat64Observable(p.scope, kind, name, desc, u, aggs), err
aggs, err := (float64InstProvider)(p).aggs(kind, name, desc, u)
return newFloat64Observable(p.meter, kind, name, desc, u, aggs), err
}

func (p float64ObservProvider) registerCallbacks(inst float64Observable, cBacks []metric.Float64Callback) {
Expand Down
Loading