From 73e512cb64e24e7c6cb7a3c417dfcb9eb2576258 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 25 May 2022 09:38:11 -0700 Subject: [PATCH] Unify reader implementations (#2923) * Unify reader implementations Use an atomic.Value to manage concurrency without a lock. * Lint --- sdk/metric/go.mod | 1 + sdk/metric/manual_reader.go | 50 ++++++++++++++++++++--------------- sdk/metric/periodic_reader.go | 20 ++++---------- sdk/metric/reader.go | 14 ++++++++++ sdk/metric/reader_test.go | 6 +++++ 5 files changed, 54 insertions(+), 37 deletions(-) diff --git a/sdk/metric/go.mod b/sdk/metric/go.mod index 933ed548e2f..a818f0ec846 100644 --- a/sdk/metric/go.mod +++ b/sdk/metric/go.mod @@ -3,6 +3,7 @@ module go.opentelemetry.io/otel/sdk/metric go 1.16 require ( + github.com/go-logr/logr v1.2.3 github.com/stretchr/testify v1.7.1 go.opentelemetry.io/otel v1.7.0 go.opentelemetry.io/otel/metric v0.0.0-00010101000000-000000000000 diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index a93607bf537..485f09faa8e 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -19,7 +19,9 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" + "fmt" "sync" + "sync/atomic" "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/sdk/metric/export" @@ -28,9 +30,8 @@ import ( // manualReader is a a simple Reader that allows an application to // read metrics on demand. type manualReader struct { - lock sync.Mutex - producer producer - shutdown bool + producer atomic.Value + shutdownOnce sync.Once } // Compile time check the manualReader implements Reader. @@ -44,14 +45,11 @@ func NewManualReader() Reader { // register stores the Producer which enables the caller to read // metrics on demand. func (mr *manualReader) register(p producer) { - mr.lock.Lock() - defer mr.lock.Unlock() - if mr.producer != nil { - msg := "did not register manualReader" + // Only register once. If producer is already set, do nothing. + if !mr.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) { + msg := "did not register manual reader" global.Error(errDuplicateRegister, msg) - return } - mr.producer = p } // ForceFlush is a no-op, it always returns nil. @@ -61,25 +59,33 @@ func (mr *manualReader) ForceFlush(context.Context) error { // Shutdown closes any connections and frees any resources used by the reader. func (mr *manualReader) Shutdown(context.Context) error { - mr.lock.Lock() - defer mr.lock.Unlock() - if mr.shutdown { - return ErrReaderShutdown - } - mr.shutdown = true - return nil + err := ErrReaderShutdown + mr.shutdownOnce.Do(func() { + // Any future call to Collect will now return ErrReaderShutdown. + mr.producer.Store(produceHolder{ + produce: shutdownProducer{}.produce, + }) + err = nil + }) + return err } // Collect gathers all metrics from the SDK, calling any callbacks necessary. // Collect will return an error if called after shutdown. func (mr *manualReader) Collect(ctx context.Context) (export.Metrics, error) { - mr.lock.Lock() - defer mr.lock.Unlock() - if mr.producer == nil { + p := mr.producer.Load() + if p == nil { return export.Metrics{}, ErrReaderNotRegistered } - if mr.shutdown { - return export.Metrics{}, ErrReaderShutdown + + ph, ok := p.(produceHolder) + if !ok { + // The atomic.Value is entirely in the periodicReader's control so + // this should never happen. In the unforeseen case that this does + // happen, return an error instead of panicking so a users code does + // not halt in the processes. + err := fmt.Errorf("manual reader: invalid producer: %T", p) + return export.Metrics{}, err } - return mr.producer.produce(ctx) + return ph.produce(ctx) } diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 0b54874fd34..75585fe9178 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -25,6 +25,7 @@ import ( "time" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/sdk/metric/export" ) @@ -166,7 +167,10 @@ func (r *periodicReader) run(ctx context.Context, interval time.Duration) { // register registers p as the producer of this reader. func (r *periodicReader) register(p producer) { // Only register once. If producer is already set, do nothing. - r.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) + if !r.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) { + msg := "did not register periodic reader" + global.Error(errDuplicateRegister, msg) + } } // Collect gathers and returns all metric data related to the Reader from @@ -214,17 +218,3 @@ func (r *periodicReader) Shutdown(ctx context.Context) error { }) return err } - -// produceHolder is used as an atomic.Value to wrap the non-concrete producer -// type. -type produceHolder struct { - produce func(context.Context) (export.Metrics, error) -} - -// shutdownProducer produces an ErrReaderShutdown error always. -type shutdownProducer struct{} - -// produce returns an ErrReaderShutdown error. -func (p shutdownProducer) produce(context.Context) (export.Metrics, error) { - return export.Metrics{}, ErrReaderShutdown -} diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index e73c000a135..7964e08d383 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -87,3 +87,17 @@ type producer interface { // This method is safe to call concurrently. produce(context.Context) (export.Metrics, error) } + +// produceHolder is used as an atomic.Value to wrap the non-concrete producer +// type. +type produceHolder struct { + produce func(context.Context) (export.Metrics, error) +} + +// shutdownProducer produces an ErrReaderShutdown error always. +type shutdownProducer struct{} + +// produce returns an ErrReaderShutdown error. +func (p shutdownProducer) produce(context.Context) (export.Metrics, error) { + return export.Metrics{}, ErrReaderShutdown +} diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index 6ca9f94b5bd..cb6a1761805 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -22,9 +22,11 @@ import ( "sync" "testing" + "github.com/go-logr/logr/testr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/sdk/metric/export" ) @@ -35,6 +37,10 @@ type readerTestSuite struct { Reader Reader } +func (ts *readerTestSuite) SetupSuite() { + otel.SetLogger(testr.New(ts.T())) +} + func (ts *readerTestSuite) SetupTest() { ts.Reader = ts.Factory() }