diff --git a/example/opencensus/main.go b/example/opencensus/main.go index 8afbd9e5b9b..c9709bad37a 100644 --- a/example/opencensus/main.go +++ b/example/opencensus/main.go @@ -103,9 +103,8 @@ func tracing(otExporter sdktrace.SpanExporter) { // registry or an OpenCensus view. func monitoring(exporter metric.Exporter) error { log.Println("Adding the OpenCensus metric Producer to an OpenTelemetry Reader to export OpenCensus metrics using the OpenTelemetry stdout exporter.") - reader := metric.NewPeriodicReader(exporter) // Register the OpenCensus metric Producer to add metrics from OpenCensus to the output. - reader.RegisterProducer(opencensus.NewMetricProducer()) + reader := metric.NewPeriodicReader(exporter, metric.WithProducer(opencensus.NewMetricProducer())) metric.NewMeterProvider(metric.WithReader(reader)) log.Println("Registering a gauge metric using an OpenCensus registry.") diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 5677003550b..9c83f50dc10 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -34,7 +34,7 @@ type ManualReader struct { mu sync.Mutex isShutdown bool - externalProducers atomic.Value + externalProducers []Producer temporalitySelector TemporalitySelector aggregationSelector AggregationSelector @@ -49,8 +49,8 @@ func NewManualReader(opts ...ManualReaderOption) *ManualReader { r := &ManualReader{ temporalitySelector: cfg.temporalitySelector, aggregationSelector: cfg.aggregationSelector, + externalProducers: cfg.producers, } - r.externalProducers.Store([]Producer{}) return r } @@ -64,23 +64,6 @@ func (mr *ManualReader) register(p sdkProducer) { } } -// RegisterProducer stores the external Producer which enables the caller -// to read metrics on demand. -// -// This method is safe to call concurrently. -func (mr *ManualReader) RegisterProducer(p Producer) { - mr.mu.Lock() - defer mr.mu.Unlock() - if mr.isShutdown { - return - } - currentProducers := mr.externalProducers.Load().([]Producer) - newProducers := []Producer{} - newProducers = append(newProducers, currentProducers...) - newProducers = append(newProducers, p) - mr.externalProducers.Store(newProducers) -} - // temporality reports the Temporality for the instrument kind provided. func (mr *ManualReader) temporality(kind InstrumentKind) metricdata.Temporality { return mr.temporalitySelector(kind) @@ -112,7 +95,7 @@ func (mr *ManualReader) Shutdown(context.Context) error { defer mr.mu.Unlock() mr.isShutdown = true // release references to Producer(s) - mr.externalProducers.Store([]Producer{}) + mr.externalProducers = nil err = nil }) return err @@ -150,7 +133,7 @@ func (mr *ManualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetr return err } var errs []error - for _, producer := range mr.externalProducers.Load().([]Producer) { + for _, producer := range mr.externalProducers { externalMetrics, err := producer.Produce(ctx) if err != nil { errs = append(errs, err) @@ -183,6 +166,7 @@ func (r *ManualReader) MarshalLog() interface{} { type manualReaderConfig struct { temporalitySelector TemporalitySelector aggregationSelector AggregationSelector + producers []Producer } // newManualReaderConfig returns a manualReaderConfig configured with options. diff --git a/sdk/metric/manual_reader_test.go b/sdk/metric/manual_reader_test.go index 485c1b6eaf0..a16d448ca04 100644 --- a/sdk/metric/manual_reader_test.go +++ b/sdk/metric/manual_reader_test.go @@ -27,7 +27,13 @@ import ( ) func TestManualReader(t *testing.T) { - suite.Run(t, &readerTestSuite{Factory: func() Reader { return NewManualReader() }}) + suite.Run(t, &readerTestSuite{Factory: func(opts ...ReaderOption) Reader { + var mopts []ManualReaderOption + for _, o := range opts { + mopts = append(mopts, o) + } + return NewManualReader(mopts...) + }}) } func BenchmarkManualReader(b *testing.B) { diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 3ea2c2f0fb2..e0492fc03e4 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -36,8 +36,9 @@ const ( // periodicReaderConfig contains configuration options for a PeriodicReader. type periodicReaderConfig struct { - interval time.Duration - timeout time.Duration + interval time.Duration + timeout time.Duration + producers []Producer } // newPeriodicReaderConfig returns a periodicReaderConfig configured with @@ -115,18 +116,18 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *Peri conf := newPeriodicReaderConfig(options) ctx, cancel := context.WithCancel(context.Background()) r := &PeriodicReader{ - interval: conf.interval, - timeout: conf.timeout, - exporter: exporter, - flushCh: make(chan chan error), - cancel: cancel, - done: make(chan struct{}), + interval: conf.interval, + timeout: conf.timeout, + exporter: exporter, + flushCh: make(chan chan error), + cancel: cancel, + done: make(chan struct{}), + externalProducers: conf.producers, rmPool: sync.Pool{ New: func() interface{} { return &metricdata.ResourceMetrics{} }}, } - r.externalProducers.Store([]Producer{}) go func() { defer func() { close(r.done) }() @@ -143,7 +144,7 @@ type PeriodicReader struct { mu sync.Mutex isShutdown bool - externalProducers atomic.Value + externalProducers []Producer interval time.Duration timeout time.Duration @@ -194,22 +195,6 @@ func (r *PeriodicReader) register(p sdkProducer) { } } -// RegisterProducer registers p as an external Producer of this reader. -// -// This method is safe to call concurrently. -func (r *PeriodicReader) RegisterProducer(p Producer) { - r.mu.Lock() - defer r.mu.Unlock() - if r.isShutdown { - return - } - currentProducers := r.externalProducers.Load().([]Producer) - newProducers := []Producer{} - newProducers = append(newProducers, currentProducers...) - newProducers = append(newProducers, p) - r.externalProducers.Store(newProducers) -} - // temporality reports the Temporality for the instrument kind provided. func (r *PeriodicReader) temporality(kind InstrumentKind) metricdata.Temporality { return r.exporter.Temporality(kind) @@ -275,7 +260,7 @@ func (r *PeriodicReader) collect(ctx context.Context, p interface{}, rm *metricd return err } var errs []error - for _, producer := range r.externalProducers.Load().([]Producer) { + for _, producer := range r.externalProducers { externalMetrics, err := producer.Produce(ctx) if err != nil { errs = append(errs, err) @@ -351,7 +336,7 @@ func (r *PeriodicReader) Shutdown(ctx context.Context) error { defer r.mu.Unlock() r.isShutdown = true // release references to Producer(s) - r.externalProducers.Store([]Producer{}) + r.externalProducers = nil }) return err } diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index e5d8fd2ef30..92e50b92d3b 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -202,7 +202,7 @@ type periodicReaderTestSuite struct { } func (ts *periodicReaderTestSuite) SetupTest() { - ts.readerTestSuite.SetupTest() + ts.Reader = ts.Factory() e := &fnExporter{ exportFunc: func(context.Context, *metricdata.ResourceMetrics) error { return assert.AnError }, @@ -210,9 +210,8 @@ func (ts *periodicReaderTestSuite) SetupTest() { shutdownFunc: func(context.Context) error { return assert.AnError }, } - ts.ErrReader = NewPeriodicReader(e) + ts.ErrReader = NewPeriodicReader(e, WithProducer(testExternalProducer{})) ts.ErrReader.register(testSDKProducer{}) - ts.ErrReader.RegisterProducer(testExternalProducer{}) } func (ts *periodicReaderTestSuite) TearDownTest() { @@ -232,8 +231,12 @@ func (ts *periodicReaderTestSuite) TestShutdownPropagated() { func TestPeriodicReader(t *testing.T) { suite.Run(t, &periodicReaderTestSuite{ readerTestSuite: &readerTestSuite{ - Factory: func() Reader { - return NewPeriodicReader(new(fnExporter)) + Factory: func(opts ...ReaderOption) Reader { + var popts []PeriodicReaderOption + for _, o := range opts { + popts = append(popts, o) + } + return NewPeriodicReader(new(fnExporter), popts...) }, }, }) @@ -290,9 +293,8 @@ func TestPeriodicReaderRun(t *testing.T) { }, } - r := NewPeriodicReader(exp) + r := NewPeriodicReader(exp, WithProducer(testExternalProducer{})) r.register(testSDKProducer{}) - r.RegisterProducer(testExternalProducer{}) trigger <- time.Now() assert.Equal(t, assert.AnError, <-eh.Err) @@ -319,9 +321,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("ForceFlush", func(t *testing.T) { exp, called := expFunc(t) - r := NewPeriodicReader(exp) + r := NewPeriodicReader(exp, WithProducer(testExternalProducer{})) r.register(testSDKProducer{}) - r.RegisterProducer(testExternalProducer{}) assert.Equal(t, assert.AnError, r.ForceFlush(context.Background()), "export error not returned") assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed") @@ -331,9 +332,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("Shutdown", func(t *testing.T) { exp, called := expFunc(t) - r := NewPeriodicReader(exp) + r := NewPeriodicReader(exp, WithProducer(testExternalProducer{})) r.register(testSDKProducer{}) - r.RegisterProducer(testExternalProducer{}) assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned") assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed") }) diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index 95d8f0d2231..55ffba9e678 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -57,13 +57,6 @@ type Reader interface { // and send aggregated metric measurements. register(sdkProducer) - // RegisterProducer registers a an external Producer with this Reader. - // The Producer is used as a source of aggregated metric data which is - // incorporated into metrics collected from the SDK. - // - // This method needs to be concurrent safe. - RegisterProducer(Producer) - // temporality reports the Temporality for the instrument kind provided. temporality(InstrumentKind) metricdata.Temporality @@ -167,3 +160,32 @@ func DefaultAggregationSelector(ik InstrumentKind) aggregation.Aggregation { } panic("unknown instrument kind") } + +// ReaderOption is an option which can be applied to manual or Periodic +// readers. +type ReaderOption interface { + PeriodicReaderOption + ManualReaderOption +} + +// WithProducers registers producers as an external Producer of metric data +// for this Reader. +func WithProducer(p Producer) ReaderOption { + return producerOption{p: p} +} + +type producerOption struct { + p Producer +} + +// applyManual returns a manualReaderConfig with option applied. +func (o producerOption) applyManual(c manualReaderConfig) manualReaderConfig { + c.producers = append(c.producers, o.p) + return c +} + +// applyPeriodic returns a periodicReaderConfig with option applied. +func (o producerOption) applyPeriodic(c periodicReaderConfig) periodicReaderConfig { + c.producers = append(c.producers, o.p) + return c +} diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index ca10da77340..8056008545b 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -34,7 +34,7 @@ import ( type readerTestSuite struct { suite.Suite - Factory func() Reader + Factory func(...ReaderOption) Reader Reader Reader } @@ -42,21 +42,19 @@ func (ts *readerTestSuite) SetupSuite() { otel.SetLogger(testr.New(ts.T())) } -func (ts *readerTestSuite) SetupTest() { - ts.Reader = ts.Factory() -} - func (ts *readerTestSuite) TearDownTest() { // Ensure Reader is allowed attempt to clean up. _ = ts.Reader.Shutdown(context.Background()) } func (ts *readerTestSuite) TestErrorForNotRegistered() { + ts.Reader = ts.Factory() err := ts.Reader.Collect(context.Background(), &metricdata.ResourceMetrics{}) ts.ErrorIs(err, ErrReaderNotRegistered) } func (ts *readerTestSuite) TestSDKProducer() { + ts.Reader = ts.Factory() ts.Reader.register(testSDKProducer{}) m := metricdata.ResourceMetrics{} err := ts.Reader.Collect(context.Background(), &m) @@ -65,8 +63,8 @@ func (ts *readerTestSuite) TestSDKProducer() { } func (ts *readerTestSuite) TestExternalProducer() { + ts.Reader = ts.Factory(WithProducer(testExternalProducer{})) ts.Reader.register(testSDKProducer{}) - ts.Reader.RegisterProducer(testExternalProducer{}) m := metricdata.ResourceMetrics{} err := ts.Reader.Collect(context.Background(), &m) ts.NoError(err) @@ -74,9 +72,9 @@ func (ts *readerTestSuite) TestExternalProducer() { } func (ts *readerTestSuite) TestCollectAfterShutdown() { + ts.Reader = ts.Factory(WithProducer(testExternalProducer{})) ctx := context.Background() ts.Reader.register(testSDKProducer{}) - ts.Reader.RegisterProducer(testExternalProducer{}) ts.Require().NoError(ts.Reader.Shutdown(ctx)) m := metricdata.ResourceMetrics{} @@ -86,22 +84,23 @@ func (ts *readerTestSuite) TestCollectAfterShutdown() { } func (ts *readerTestSuite) TestShutdownTwice() { + ts.Reader = ts.Factory(WithProducer(testExternalProducer{})) ctx := context.Background() ts.Reader.register(testSDKProducer{}) - ts.Reader.RegisterProducer(testExternalProducer{}) ts.Require().NoError(ts.Reader.Shutdown(ctx)) ts.ErrorIs(ts.Reader.Shutdown(ctx), ErrReaderShutdown) } func (ts *readerTestSuite) TestMultipleForceFlush() { + ts.Reader = ts.Factory(WithProducer(testExternalProducer{})) ctx := context.Background() ts.Reader.register(testSDKProducer{}) - ts.Reader.RegisterProducer(testExternalProducer{}) ts.Require().NoError(ts.Reader.ForceFlush(ctx)) ts.NoError(ts.Reader.ForceFlush(ctx)) } func (ts *readerTestSuite) TestMultipleRegister() { + ts.Reader = ts.Factory() p0 := testSDKProducer{ produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error { // Differentiate this producer from the second by returning an @@ -121,21 +120,19 @@ func (ts *readerTestSuite) TestMultipleRegister() { } func (ts *readerTestSuite) TestExternalProducerPartialSuccess() { - ts.Reader.register(testSDKProducer{}) - ts.Reader.RegisterProducer( - testExternalProducer{ + ts.Reader = ts.Factory( + WithProducer(testExternalProducer{ produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) { return []metricdata.ScopeMetrics{}, assert.AnError }, - }, - ) - ts.Reader.RegisterProducer( - testExternalProducer{ + }), + WithProducer(testExternalProducer{ produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) { return []metricdata.ScopeMetrics{testScopeMetricsB}, nil }, - }, + }), ) + ts.Reader.register(testSDKProducer{}) m := metricdata.ResourceMetrics{} err := ts.Reader.Collect(context.Background(), &m) @@ -144,12 +141,12 @@ func (ts *readerTestSuite) TestExternalProducerPartialSuccess() { } func (ts *readerTestSuite) TestSDKFailureBlocksExternalProducer() { + ts.Reader = ts.Factory(WithProducer(testExternalProducer{})) ts.Reader.register(testSDKProducer{ produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error { *rm = metricdata.ResourceMetrics{} return assert.AnError }}) - ts.Reader.RegisterProducer(testExternalProducer{}) m := metricdata.ResourceMetrics{} err := ts.Reader.Collect(context.Background(), &m) @@ -158,11 +155,11 @@ func (ts *readerTestSuite) TestSDKFailureBlocksExternalProducer() { } func (ts *readerTestSuite) TestMethodConcurrency() { + ts.Reader = ts.Factory(WithProducer(testExternalProducer{})) // Requires the race-detector (a default test option for the project). // All reader methods should be concurrent-safe. ts.Reader.register(testSDKProducer{}) - ts.Reader.RegisterProducer(testExternalProducer{}) ctx := context.Background() var wg sync.WaitGroup @@ -190,11 +187,11 @@ func (ts *readerTestSuite) TestMethodConcurrency() { } func (ts *readerTestSuite) TestShutdownBeforeRegister() { + ts.Reader = ts.Factory(WithProducer(testExternalProducer{})) ctx := context.Background() ts.Require().NoError(ts.Reader.Shutdown(ctx)) // Registering after shutdown should not revert the shutdown. ts.Reader.register(testSDKProducer{}) - ts.Reader.RegisterProducer(testExternalProducer{}) m := metricdata.ResourceMetrics{} err := ts.Reader.Collect(ctx, &m) @@ -203,6 +200,7 @@ func (ts *readerTestSuite) TestShutdownBeforeRegister() { } func (ts *readerTestSuite) TestCollectNilResourceMetricError() { + ts.Reader = ts.Factory() ctx := context.Background() ts.Assert().Error(ts.Reader.Collect(ctx, nil)) }