Skip to content

Commit

Permalink
ensure the initOnce is always called; handle multiple errors
Browse files Browse the repository at this point in the history
  • Loading branch information
jmacd committed Sep 21, 2021
1 parent ea7bc59 commit 3356eb5
Showing 1 changed file with 43 additions and 27 deletions.
70 changes: 43 additions & 27 deletions sdk/metric/controller/basic/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ func (c *Controller) Meter(instrumentationName string, opts ...metric.MeterOptio
newTmp := &initMeterOnce{}
m, _ := c.libraries.LoadOrStore(library, newTmp)
mo := m.(*initMeterOnce)

return metric.WrapMeterImpl(c.initializeUniqueMeter(library, mo))
}

func (c *Controller) initializeUniqueMeter(library instrumentation.Library, mo *initMeterOnce) *registry.UniqueInstrumentMeterImpl {
mo.initOnce.Do(func() {
checkpointer := c.checkpointerFactory.NewCheckpointer()
accumulator := sdk.NewAccumulator(checkpointer)
Expand All @@ -104,8 +109,19 @@ func (c *Controller) Meter(instrumentationName string, opts ...metric.MeterOptio
library: library,
})
})
return mo.unique
}

return metric.WrapMeterImpl(mo.unique)
// syncMapKeyValueToAccuulatorCheckpointer encapsulates the invariants
// placed on the libraries sync.Map, which is a
// map[instrumentation.Library]*initMeterOnce where the
// registry.UniqueInstrumentMeter's implementation is a
// *accumulatorCheckpointer.
func (c *Controller) syncMapKeyValueToAccumulatorCheckpointer(key, value interface{}) *accumulatorCheckpointer {
return c.initializeUniqueMeter(
key.(instrumentation.Library),
value.(*initMeterOnce),
).MeterImpl().(*accumulatorCheckpointer)
}

type accumulatorCheckpointer struct {
Expand Down Expand Up @@ -245,35 +261,27 @@ func (c *Controller) collect(ctx context.Context) error {
return c.export(ctx)
}

// accumulatorList returns a snapshot of current accumulators
// registered to this controller. This briefly locks the controller.
func (c *Controller) accumulatorList() []*accumulatorCheckpointer {
c.lock.Lock()
defer c.lock.Unlock()

var r []*accumulatorCheckpointer
c.libraries.Range(func(_, value interface{}) bool {
mo := value.(*initMeterOnce)
acc, ok := mo.unique.MeterImpl().(*accumulatorCheckpointer)
if ok {
r = append(r, acc)
}
return true
})
return r
}

// checkpoint calls the Accumulator and Checkpointer interfaces to
// compute the Reader. This applies the configured collection
// timeout. Note that this does not try to cancel a Collect or Export
// when Stop() is called.
func (c *Controller) checkpoint(ctx context.Context) error {
for _, impl := range c.accumulatorList() {
if err := c.checkpointSingleAccumulator(ctx, impl); err != nil {
return err
var errs []error
c.libraries.Range(func(key, value interface{}) bool {
acPair := c.syncMapKeyValueToAccumulatorCheckpointer(key, value)

if err := c.checkpointSingleAccumulator(ctx, acPair); err != nil {
errs = append(errs, err)
}
return false
})
if errs == nil {
return nil
}
return nil
if len(errs) == 1 {
return errs[0]
}
return fmt.Errorf("multiple checkpoint errors %w %v", errs[0], errs[1:])
}

// checkpointSingleAccumulator checkpoints a single instrumentation
Expand Down Expand Up @@ -329,18 +337,26 @@ func (c *Controller) export(ctx context.Context) error {

// ForEach implements export.InstrumentationLibraryReader.
func (c *Controller) ForEach(readerFunc func(l instrumentation.Library, r export.Reader) error) error {
for _, acPair := range c.accumulatorList() {
var errs []error
c.libraries.Range(func(key, value interface{}) bool {
acPair := c.syncMapKeyValueToAccumulatorCheckpointer(key, value)
reader := acPair.checkpointer.Reader()
// TODO: We should not fail fast; instead accumulate errors.
if err := func() error {
reader.RLock()
defer reader.RUnlock()
return readerFunc(acPair.library, reader)
}(); err != nil {
return err
errs = append(errs, err)
}
return false
})
if errs == nil {
return nil
}
return nil
if len(errs) == 1 {
return errs[0]
}
return fmt.Errorf("multiple ForEach errors %w %v", errs[0], errs[1:])
}

// IsRunning returns true if the controller was started via Start(),
Expand Down

0 comments on commit 3356eb5

Please sign in to comment.