Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow multi-instrument callbacks to be unregistered #3522

Merged
merged 18 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Added

- Return a `Registration` from the `RegisterCallback` method of a `Meter` in the `go.opentelemetry.io/otel/metric` package.
This `Registration` can be used to unregister callbacks. (#3522)

### Removed

- The deprecated `go.opentelemetry.io/otel/sdk/metric/view` package is removed. (#3520)
Expand Down
2 changes: 1 addition & 1 deletion example/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func main() {
if err != nil {
log.Fatal(err)
}
err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) {
_, err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) {
n := -10. + rand.Float64()*(90.) // [-10, 100)
gauge.Observe(ctx, n, attrs...)
})
Expand Down
4 changes: 2 additions & 2 deletions metric/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func ExampleMeter_asynchronous_single() {
panic(err)
}

err = meter.RegisterCallback([]instrument.Asynchronous{memoryUsage},
_, err = meter.RegisterCallback([]instrument.Asynchronous{memoryUsage},
func(ctx context.Context) {
// instrument.WithCallbackFunc(func(ctx context.Context) {
//Do Work to get the real memoryUsage
Expand All @@ -86,7 +86,7 @@ func ExampleMeter_asynchronous_multiple() {
gcCount, _ := meter.AsyncInt64().Counter("gcCount")
gcPause, _ := meter.SyncFloat64().Histogram("gcPause")

err := meter.RegisterCallback([]instrument.Asynchronous{
_, err := meter.RegisterCallback([]instrument.Asynchronous{
heapAlloc,
gcCount,
},
Expand Down
63 changes: 49 additions & 14 deletions metric/internal/global/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package global // import "go.opentelemetry.io/otel/metric/internal/global"

import (
"container/list"
"context"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -109,7 +110,8 @@ type meter struct {

mtx sync.Mutex
instruments []delegatedInstrument
callbacks []delegatedCallback

registry list.List

delegate atomic.Value // metric.Meter
}
Expand All @@ -135,12 +137,14 @@ func (m *meter) setDelegate(provider metric.MeterProvider) {
inst.setDelegate(meter)
}

for _, callback := range m.callbacks {
callback.setDelegate(meter)
for e := m.registry.Front(); e != nil; e = e.Next() {
r := e.Value.(*registration)
r.setDelegate(meter)
m.registry.Remove(e)
}

m.instruments = nil
m.callbacks = nil
m.registry.Init()
}

// AsyncInt64 is the namespace for the Asynchronous Integer instruments.
Expand All @@ -167,20 +171,24 @@ func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider {
//
// It is only valid to call Observe within the scope of the passed function,
// and only on the instruments that were registered with this call.
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error {
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) {
if del, ok := m.delegate.Load().(metric.Meter); ok {
insts = unwrapInstruments(insts)
return del.RegisterCallback(insts, function)
return del.RegisterCallback(insts, f)
}

m.mtx.Lock()
defer m.mtx.Unlock()
m.callbacks = append(m.callbacks, delegatedCallback{
instruments: insts,
function: function,
})

return nil
reg := &registration{instruments: insts, function: f}
e := m.registry.PushBack(reg)
reg.unreg = func() error {
m.mtx.Lock()
_ = m.registry.Remove(e)
m.mtx.Unlock()
return nil
}
return reg, nil
}

type wrapped interface {
Expand Down Expand Up @@ -217,17 +225,44 @@ func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider {
return (*sfInstProvider)(m)
}

type delegatedCallback struct {
type registration struct {
instruments []instrument.Asynchronous
function func(context.Context)

unreg func() error
unregMu sync.Mutex
}

func (c *delegatedCallback) setDelegate(m metric.Meter) {
func (c *registration) setDelegate(m metric.Meter) {
insts := unwrapInstruments(c.instruments)
err := m.RegisterCallback(insts, c.function)

c.unregMu.Lock()
defer c.unregMu.Unlock()

if c.unreg == nil {
// Unregister already called.
return
}

reg, err := m.RegisterCallback(insts, c.function)
if err != nil {
otel.Handle(err)
}

c.unreg = reg.Unregister
}

func (c *registration) Unregister() error {
c.unregMu.Lock()
defer c.unregMu.Unlock()
if c.unreg == nil {
// Unregister already called.
return nil
}

var err error
err, c.unreg = c.unreg(), nil
return err
}

type afInstProvider meter
Expand Down
84 changes: 81 additions & 3 deletions metric/internal/global/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestMeterRace(t *testing.T) {
_, _ = mtr.SyncInt64().Counter(name)
_, _ = mtr.SyncInt64().UpDownCounter(name)
_, _ = mtr.SyncInt64().Histogram(name)
_ = mtr.RegisterCallback(nil, func(ctx context.Context) {})
_, _ = mtr.RegisterCallback(nil, func(ctx context.Context) {})
if !once {
wg.Done()
once = true
Expand All @@ -86,6 +86,35 @@ func TestMeterRace(t *testing.T) {
close(finish)
}

func TestUnregisterRace(t *testing.T) {
mtr := &meter{}
reg, err := mtr.RegisterCallback(nil, func(ctx context.Context) {})
require.NoError(t, err)

wg := &sync.WaitGroup{}
wg.Add(1)
finish := make(chan struct{})
go func() {
for i, once := 0, false; ; i++ {
_ = reg.Unregister()
if !once {
wg.Done()
once = true
}
select {
case <-finish:
return
default:
}
}
}()
_ = reg.Unregister()

wg.Wait()
mtr.setDelegate(metric.NewNoopMeterProvider())
close(finish)
}

func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (syncfloat64.Counter, asyncfloat64.Counter) {
afcounter, err := m.AsyncFloat64().Counter("test_Async_Counter")
require.NoError(t, err)
Expand All @@ -101,9 +130,10 @@ func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (syncfloat64.Coun
_, err = m.AsyncInt64().Gauge("test_Async_Gauge")
assert.NoError(t, err)

require.NoError(t, m.RegisterCallback([]instrument.Asynchronous{afcounter}, func(ctx context.Context) {
_, err = m.RegisterCallback([]instrument.Asynchronous{afcounter}, func(ctx context.Context) {
afcounter.Observe(ctx, 3)
}))
})
require.NoError(t, err)

sfcounter, err := m.SyncFloat64().Counter("test_Async_Counter")
require.NoError(t, err)
Expand Down Expand Up @@ -257,3 +287,51 @@ func TestMeterDefersDelegations(t *testing.T) {
assert.IsType(t, &afCounter{}, actr)
assert.Equal(t, 1, mp.count)
}

func TestRegistrationDelegation(t *testing.T) {
// globalMeterProvider := otel.GetMeterProvider
globalMeterProvider := &meterProvider{}

m := globalMeterProvider.Meter("go.opentelemetry.io/otel/metric/internal/global/meter_test")
require.IsType(t, &meter{}, m)
mImpl := m.(*meter)

actr, err := m.AsyncFloat64().Counter("test_Async_Counter")
require.NoError(t, err)

var called0 bool
reg0, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) {
called0 = true
})
require.NoError(t, err)
require.Equal(t, 1, mImpl.registry.Len(), "callback not registered")
// This means reg0 should not be delegated.
assert.NoError(t, reg0.Unregister())
assert.Equal(t, 0, mImpl.registry.Len(), "callback not unregistered")

var called1 bool
reg1, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) {
called1 = true
})
require.NoError(t, err)
require.Equal(t, 1, mImpl.registry.Len(), "second callback not registered")

mp := &testMeterProvider{}

// otel.SetMeterProvider(mp)
globalMeterProvider.setDelegate(mp)

testCollect(t, m) // This is a hacky way to emulate a read from an exporter
require.False(t, called0, "pre-delegation unregistered callback called")
require.True(t, called1, "callback not called")

called1 = false
assert.NoError(t, reg1.Unregister(), "unregister second callback")

testCollect(t, m) // This is a hacky way to emulate a read from an exporter
assert.False(t, called1, "unregistered callback called")

assert.NotPanics(t, func() {
assert.NoError(t, reg1.Unregister(), "duplicate unregister calls")
})
}
21 changes: 19 additions & 2 deletions metric/internal/global/meter_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,21 @@ func (m *testMeter) AsyncFloat64() asyncfloat64.InstrumentProvider {
//
// It is only valid to call Observe within the scope of the passed function,
// and only on the instruments that were registered with this call.
func (m *testMeter) RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error {
m.callbacks = append(m.callbacks, function)
func (m *testMeter) RegisterCallback(i []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) {
m.callbacks = append(m.callbacks, f)
return testReg{
f: func(idx int) func() {
return func() { m.callbacks[idx] = nil }
}(len(m.callbacks) - 1),
}, nil
}

type testReg struct {
f func()
}

func (r testReg) Unregister() error {
r.f()
return nil
}

Expand All @@ -85,6 +98,10 @@ func (m *testMeter) SyncFloat64() syncfloat64.InstrumentProvider {
func (m *testMeter) collect() {
ctx := context.Background()
for _, f := range m.callbacks {
if f == nil {
// Unregister.
continue
}
f(ctx)
}
}
Expand Down
28 changes: 22 additions & 6 deletions metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,30 @@ type Meter interface {
// To Observe data with instruments it must be registered in a callback.
AsyncFloat64() asyncfloat64.InstrumentProvider

// RegisterCallback captures the function that will be called during Collect.
//
// It is only valid to call Observe within the scope of the passed function,
// and only on the instruments that were registered with this call.
RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error

// SyncInt64 is the namespace for the Synchronous Integer instruments
SyncInt64() syncint64.InstrumentProvider
// SyncFloat64 is the namespace for the Synchronous Float instruments
SyncFloat64() syncfloat64.InstrumentProvider

// RegisterCallback registers f to be called during the collection of a
// measurement cycle.
//
// If Unregister of the returned Registration is called, f needs to be
// unregistered and not called during collection.
//
// The instruments f is registered with are the only instruments that f may
// observe values for.
//
// If no instruments are passed, f should not be registered nor called
// during collection.
RegisterCallback(instruments []instrument.Asynchronous, f func(context.Context)) (Registration, error)
}

// Registration is an token representing the unique registration of a callback
// for a set of instruments with a Meter.
type Registration interface {
// Unregister removes the callback registration from a Meter.
//
// This method needs to be idempotent and concurrent safe.
Unregister() error
}
8 changes: 6 additions & 2 deletions metric/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,14 @@ func (noopMeter) SyncFloat64() syncfloat64.InstrumentProvider {
}

// RegisterCallback creates a register callback that does not record any metrics.
func (noopMeter) RegisterCallback([]instrument.Asynchronous, func(context.Context)) error {
return nil
func (noopMeter) RegisterCallback([]instrument.Asynchronous, func(context.Context)) (Registration, error) {
return noopReg{}, nil
}

type noopReg struct{}

func (noopReg) Unregister() error { return nil }

type nonrecordingAsyncFloat64Instrument struct {
instrument.Asynchronous
}
Expand Down
15 changes: 11 additions & 4 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider {

// RegisterCallback registers the function f to be called when any of the
// insts Collect method is called.
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) error {
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) {
for _, inst := range insts {
// Only register if at least one instrument has a non-drop aggregation.
// Otherwise, calling f during collection will be wasted computation.
Expand All @@ -91,14 +91,21 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context
}
}
// All insts use drop aggregation.
return nil
return noopRegister{}, nil
}

func (m *meter) registerCallback(f func(context.Context)) error {
m.pipes.registerCallback(f)
type noopRegister struct{}

func (noopRegister) Unregister() error {
return nil
}

type callback func(context.Context)

func (m *meter) registerCallback(c callback) (metric.Registration, error) {
return m.pipes.registerCallback(c), nil
}

// SyncInt64 returns the synchronous integer instrument provider.
func (m *meter) SyncInt64() syncint64.InstrumentProvider {
return syncInt64Provider{m.instProviderInt64}
Expand Down
Loading