Skip to content

Commit

Permalink
Unify reader implementations (#2923)
Browse files Browse the repository at this point in the history
* Unify reader implementations

Use an atomic.Value to manage concurrency without a lock.

* Lint
  • Loading branch information
MrAlias authored May 25, 2022
1 parent 46bf817 commit 73e512c
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 37 deletions.
1 change: 1 addition & 0 deletions sdk/metric/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module go.opentelemetry.io/otel/sdk/metric
go 1.16

require (
github.com/go-logr/logr v1.2.3
github.com/stretchr/testify v1.7.1
go.opentelemetry.io/otel v1.7.0
go.opentelemetry.io/otel/metric v0.0.0-00010101000000-000000000000
Expand Down
50 changes: 28 additions & 22 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"fmt"
"sync"
"sync/atomic"

"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric/export"
Expand All @@ -28,9 +30,8 @@ import (
// manualReader is a a simple Reader that allows an application to
// read metrics on demand.
type manualReader struct {
lock sync.Mutex
producer producer
shutdown bool
producer atomic.Value
shutdownOnce sync.Once
}

// Compile time check the manualReader implements Reader.
Expand All @@ -44,14 +45,11 @@ func NewManualReader() Reader {
// register stores the Producer which enables the caller to read
// metrics on demand.
func (mr *manualReader) register(p producer) {
mr.lock.Lock()
defer mr.lock.Unlock()
if mr.producer != nil {
msg := "did not register manualReader"
// Only register once. If producer is already set, do nothing.
if !mr.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
msg := "did not register manual reader"
global.Error(errDuplicateRegister, msg)
return
}
mr.producer = p
}

// ForceFlush is a no-op, it always returns nil.
Expand All @@ -61,25 +59,33 @@ func (mr *manualReader) ForceFlush(context.Context) error {

// Shutdown closes any connections and frees any resources used by the reader.
func (mr *manualReader) Shutdown(context.Context) error {
mr.lock.Lock()
defer mr.lock.Unlock()
if mr.shutdown {
return ErrReaderShutdown
}
mr.shutdown = true
return nil
err := ErrReaderShutdown
mr.shutdownOnce.Do(func() {
// Any future call to Collect will now return ErrReaderShutdown.
mr.producer.Store(produceHolder{
produce: shutdownProducer{}.produce,
})
err = nil
})
return err
}

// Collect gathers all metrics from the SDK, calling any callbacks necessary.
// Collect will return an error if called after shutdown.
func (mr *manualReader) Collect(ctx context.Context) (export.Metrics, error) {
mr.lock.Lock()
defer mr.lock.Unlock()
if mr.producer == nil {
p := mr.producer.Load()
if p == nil {
return export.Metrics{}, ErrReaderNotRegistered
}
if mr.shutdown {
return export.Metrics{}, ErrReaderShutdown

ph, ok := p.(produceHolder)
if !ok {
// The atomic.Value is entirely in the periodicReader's control so
// this should never happen. In the unforeseen case that this does
// happen, return an error instead of panicking so a users code does
// not halt in the processes.
err := fmt.Errorf("manual reader: invalid producer: %T", p)
return export.Metrics{}, err
}
return mr.producer.produce(ctx)
return ph.produce(ctx)
}
20 changes: 5 additions & 15 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric/export"
)

Expand Down Expand Up @@ -166,7 +167,10 @@ func (r *periodicReader) run(ctx context.Context, interval time.Duration) {
// register registers p as the producer of this reader.
func (r *periodicReader) register(p producer) {
// Only register once. If producer is already set, do nothing.
r.producer.CompareAndSwap(nil, produceHolder{produce: p.produce})
if !r.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
msg := "did not register periodic reader"
global.Error(errDuplicateRegister, msg)
}
}

// Collect gathers and returns all metric data related to the Reader from
Expand Down Expand Up @@ -214,17 +218,3 @@ func (r *periodicReader) Shutdown(ctx context.Context) error {
})
return err
}

// produceHolder is used as an atomic.Value to wrap the non-concrete producer
// type.
type produceHolder struct {
produce func(context.Context) (export.Metrics, error)
}

// shutdownProducer produces an ErrReaderShutdown error always.
type shutdownProducer struct{}

// produce returns an ErrReaderShutdown error.
func (p shutdownProducer) produce(context.Context) (export.Metrics, error) {
return export.Metrics{}, ErrReaderShutdown
}
14 changes: 14 additions & 0 deletions sdk/metric/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,17 @@ type producer interface {
// This method is safe to call concurrently.
produce(context.Context) (export.Metrics, error)
}

// produceHolder is used as an atomic.Value to wrap the non-concrete producer
// type.
type produceHolder struct {
produce func(context.Context) (export.Metrics, error)
}

// shutdownProducer produces an ErrReaderShutdown error always.
type shutdownProducer struct{}

// produce returns an ErrReaderShutdown error.
func (p shutdownProducer) produce(context.Context) (export.Metrics, error) {
return export.Metrics{}, ErrReaderShutdown
}
6 changes: 6 additions & 0 deletions sdk/metric/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
"sync"
"testing"

"github.com/go-logr/logr/testr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/metric/export"
)

Expand All @@ -35,6 +37,10 @@ type readerTestSuite struct {
Reader Reader
}

func (ts *readerTestSuite) SetupSuite() {
otel.SetLogger(testr.New(ts.T()))
}

func (ts *readerTestSuite) SetupTest() {
ts.Reader = ts.Factory()
}
Expand Down

0 comments on commit 73e512c

Please sign in to comment.