diff --git a/CHANGELOG.md b/CHANGELOG.md index 690bc7bdc33..72c11e7a517 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Accept 201 to 299 HTTP status as success in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp` and `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#4365) - Document the `Temporality` and `Aggregation` methods of the `"go.opentelemetry.io/otel/sdk/metric".Exporter"` need to be concurrent safe. (#4381) - Expand the set of units supported by the prometheus exporter, and don't add unit suffixes if they are already present in `go.opentelemetry.op/otel/exporters/prometheus` (#4374) +- Add `go.opentelemetry.op/otel/sdk/metric.WithProducer` as an Option for metric.NewManualReader and metric.NewPeriodicReader (#4346) ### Changed @@ -44,6 +45,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Restrict `Meter`s in `go.opentelemetry.io/otel/sdk/metric` to only register and collect instruments it created. (#4333) - `PeriodicReader.Shutdown` and `PeriodicReader.ForceFlush` in `go.opentelemetry.io/otel/sdk/metric` now apply the periodic reader's timeout to the operation if the user provided context does not contain a deadline. (#4356, #4377) - Upgrade all use of `go.opentelemetry.io/otel/semconv` to use `v1.21.0`. (#4408) +- Add `go.opentelemetry.op/otel/sdk/metric.WithProducer` as an Option for metric.NewManualReader and metric.NewPeriodicReader, and remove `Reader.RegisterProducer()` (#4346) ### Fixed diff --git a/example/opencensus/main.go b/example/opencensus/main.go index 8afbd9e5b9b..c9709bad37a 100644 --- a/example/opencensus/main.go +++ b/example/opencensus/main.go @@ -103,9 +103,8 @@ func tracing(otExporter sdktrace.SpanExporter) { // registry or an OpenCensus view. func monitoring(exporter metric.Exporter) error { log.Println("Adding the OpenCensus metric Producer to an OpenTelemetry Reader to export OpenCensus metrics using the OpenTelemetry stdout exporter.") - reader := metric.NewPeriodicReader(exporter) // Register the OpenCensus metric Producer to add metrics from OpenCensus to the output. - reader.RegisterProducer(opencensus.NewMetricProducer()) + reader := metric.NewPeriodicReader(exporter, metric.WithProducer(opencensus.NewMetricProducer())) metric.NewMeterProvider(metric.WithReader(reader)) log.Println("Registering a gauge metric using an OpenCensus registry.") diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 898af86edc0..2091613752b 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -34,7 +34,7 @@ type ManualReader struct { mu sync.Mutex isShutdown bool - externalProducers atomic.Value + externalProducers []Producer temporalitySelector TemporalitySelector aggregationSelector AggregationSelector @@ -49,8 +49,8 @@ func NewManualReader(opts ...ManualReaderOption) *ManualReader { r := &ManualReader{ temporalitySelector: cfg.temporalitySelector, aggregationSelector: cfg.aggregationSelector, + externalProducers: cfg.producers, } - r.externalProducers.Store([]Producer{}) return r } @@ -64,23 +64,6 @@ func (mr *ManualReader) register(p sdkProducer) { } } -// RegisterProducer stores the external Producer which enables the caller -// to read metrics on demand. -// -// This method is safe to call concurrently. -func (mr *ManualReader) RegisterProducer(p Producer) { - mr.mu.Lock() - defer mr.mu.Unlock() - if mr.isShutdown { - return - } - currentProducers := mr.externalProducers.Load().([]Producer) - newProducers := []Producer{} - newProducers = append(newProducers, currentProducers...) - newProducers = append(newProducers, p) - mr.externalProducers.Store(newProducers) -} - // temporality reports the Temporality for the instrument kind provided. func (mr *ManualReader) temporality(kind InstrumentKind) metricdata.Temporality { return mr.temporalitySelector(kind) @@ -112,7 +95,7 @@ func (mr *ManualReader) Shutdown(context.Context) error { defer mr.mu.Unlock() mr.isShutdown = true // release references to Producer(s) - mr.externalProducers.Store([]Producer{}) + mr.externalProducers = nil err = nil }) return err @@ -150,7 +133,7 @@ func (mr *ManualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetr return err } var errs []error - for _, producer := range mr.externalProducers.Load().([]Producer) { + for _, producer := range mr.externalProducers { externalMetrics, err := producer.Produce(ctx) if err != nil { errs = append(errs, err) @@ -183,6 +166,7 @@ func (r *ManualReader) MarshalLog() interface{} { type manualReaderConfig struct { temporalitySelector TemporalitySelector aggregationSelector AggregationSelector + producers []Producer } // newManualReaderConfig returns a manualReaderConfig configured with options. diff --git a/sdk/metric/manual_reader_test.go b/sdk/metric/manual_reader_test.go index 1c2f63cf504..210a66c1bbe 100644 --- a/sdk/metric/manual_reader_test.go +++ b/sdk/metric/manual_reader_test.go @@ -27,7 +27,13 @@ import ( ) func TestManualReader(t *testing.T) { - suite.Run(t, &readerTestSuite{Factory: func() Reader { return NewManualReader() }}) + suite.Run(t, &readerTestSuite{Factory: func(opts ...ReaderOption) Reader { + var mopts []ManualReaderOption + for _, o := range opts { + mopts = append(mopts, o) + } + return NewManualReader(mopts...) + }}) } func BenchmarkManualReader(b *testing.B) { diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index f62a2ae41e3..3f7fcb41b50 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -36,8 +36,9 @@ const ( // periodicReaderConfig contains configuration options for a PeriodicReader. type periodicReaderConfig struct { - interval time.Duration - timeout time.Duration + interval time.Duration + timeout time.Duration + producers []Producer } // newPeriodicReaderConfig returns a periodicReaderConfig configured with @@ -118,18 +119,18 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *Peri conf := newPeriodicReaderConfig(options) ctx, cancel := context.WithCancel(context.Background()) r := &PeriodicReader{ - interval: conf.interval, - timeout: conf.timeout, - exporter: exporter, - flushCh: make(chan chan error), - cancel: cancel, - done: make(chan struct{}), + interval: conf.interval, + timeout: conf.timeout, + exporter: exporter, + flushCh: make(chan chan error), + cancel: cancel, + done: make(chan struct{}), + externalProducers: conf.producers, rmPool: sync.Pool{ New: func() interface{} { return &metricdata.ResourceMetrics{} }}, } - r.externalProducers.Store([]Producer{}) go func() { defer func() { close(r.done) }() @@ -146,7 +147,7 @@ type PeriodicReader struct { mu sync.Mutex isShutdown bool - externalProducers atomic.Value + externalProducers []Producer interval time.Duration timeout time.Duration @@ -197,22 +198,6 @@ func (r *PeriodicReader) register(p sdkProducer) { } } -// RegisterProducer registers p as an external Producer of this reader. -// -// This method is safe to call concurrently. -func (r *PeriodicReader) RegisterProducer(p Producer) { - r.mu.Lock() - defer r.mu.Unlock() - if r.isShutdown { - return - } - currentProducers := r.externalProducers.Load().([]Producer) - newProducers := []Producer{} - newProducers = append(newProducers, currentProducers...) - newProducers = append(newProducers, p) - r.externalProducers.Store(newProducers) -} - // temporality reports the Temporality for the instrument kind provided. func (r *PeriodicReader) temporality(kind InstrumentKind) metricdata.Temporality { return r.exporter.Temporality(kind) @@ -278,7 +263,7 @@ func (r *PeriodicReader) collect(ctx context.Context, p interface{}, rm *metricd return err } var errs []error - for _, producer := range r.externalProducers.Load().([]Producer) { + for _, producer := range r.externalProducers { externalMetrics, err := producer.Produce(ctx) if err != nil { errs = append(errs, err) @@ -368,7 +353,7 @@ func (r *PeriodicReader) Shutdown(ctx context.Context) error { defer r.mu.Unlock() r.isShutdown = true // release references to Producer(s) - r.externalProducers.Store([]Producer{}) + r.externalProducers = nil }) return err } diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index 0a34b7e9951..1f2da52235f 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -202,7 +202,7 @@ type periodicReaderTestSuite struct { } func (ts *periodicReaderTestSuite) SetupTest() { - ts.readerTestSuite.SetupTest() + ts.Reader = ts.Factory() e := &fnExporter{ exportFunc: func(context.Context, *metricdata.ResourceMetrics) error { return assert.AnError }, @@ -210,9 +210,8 @@ func (ts *periodicReaderTestSuite) SetupTest() { shutdownFunc: func(context.Context) error { return assert.AnError }, } - ts.ErrReader = NewPeriodicReader(e) + ts.ErrReader = NewPeriodicReader(e, WithProducer(testExternalProducer{})) ts.ErrReader.register(testSDKProducer{}) - ts.ErrReader.RegisterProducer(testExternalProducer{}) } func (ts *periodicReaderTestSuite) TearDownTest() { @@ -232,8 +231,12 @@ func (ts *periodicReaderTestSuite) TestShutdownPropagated() { func TestPeriodicReader(t *testing.T) { suite.Run(t, &periodicReaderTestSuite{ readerTestSuite: &readerTestSuite{ - Factory: func() Reader { - return NewPeriodicReader(new(fnExporter)) + Factory: func(opts ...ReaderOption) Reader { + var popts []PeriodicReaderOption + for _, o := range opts { + popts = append(popts, o) + } + return NewPeriodicReader(new(fnExporter), popts...) }, }, }) @@ -290,9 +293,8 @@ func TestPeriodicReaderRun(t *testing.T) { }, } - r := NewPeriodicReader(exp) + r := NewPeriodicReader(exp, WithProducer(testExternalProducer{})) r.register(testSDKProducer{}) - r.RegisterProducer(testExternalProducer{}) trigger <- time.Now() assert.Equal(t, assert.AnError, <-eh.Err) @@ -319,9 +321,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("ForceFlush", func(t *testing.T) { exp, called := expFunc(t) - r := NewPeriodicReader(exp) + r := NewPeriodicReader(exp, WithProducer(testExternalProducer{})) r.register(testSDKProducer{}) - r.RegisterProducer(testExternalProducer{}) assert.Equal(t, assert.AnError, r.ForceFlush(context.Background()), "export error not returned") assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed") @@ -332,7 +333,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("ForceFlush timeout on producer", func(t *testing.T) { exp, called := expFunc(t) timeout := time.Millisecond - r := NewPeriodicReader(exp, WithTimeout(timeout)) + r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{})) r.register(testSDKProducer{ produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error { select { @@ -344,7 +345,6 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { } return nil }}) - r.RegisterProducer(testExternalProducer{}) assert.ErrorIs(t, r.ForceFlush(context.Background()), context.DeadlineExceeded) assert.False(t, *called, "exporter Export method called when it should have failed before export") @@ -355,9 +355,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("ForceFlush timeout on external producer", func(t *testing.T) { exp, called := expFunc(t) timeout := time.Millisecond - r := NewPeriodicReader(exp, WithTimeout(timeout)) - r.register(testSDKProducer{}) - r.RegisterProducer(testExternalProducer{ + r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{ produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) { select { case <-time.After(timeout + time.Second): @@ -367,7 +365,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { } return []metricdata.ScopeMetrics{testScopeMetricsA}, nil }, - }) + })) + r.register(testSDKProducer{}) assert.ErrorIs(t, r.ForceFlush(context.Background()), context.DeadlineExceeded) assert.False(t, *called, "exporter Export method called when it should have failed before export") @@ -377,9 +376,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("Shutdown", func(t *testing.T) { exp, called := expFunc(t) - r := NewPeriodicReader(exp) + r := NewPeriodicReader(exp, WithProducer(testExternalProducer{})) r.register(testSDKProducer{}) - r.RegisterProducer(testExternalProducer{}) assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned") assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed") }) @@ -387,7 +385,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("Shutdown timeout on producer", func(t *testing.T) { exp, called := expFunc(t) timeout := time.Millisecond - r := NewPeriodicReader(exp, WithTimeout(timeout)) + r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{})) r.register(testSDKProducer{ produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error { select { @@ -399,7 +397,6 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { } return nil }}) - r.RegisterProducer(testExternalProducer{}) assert.ErrorIs(t, r.Shutdown(context.Background()), context.DeadlineExceeded) assert.False(t, *called, "exporter Export method called when it should have failed before export") }) @@ -407,9 +404,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("Shutdown timeout on external producer", func(t *testing.T) { exp, called := expFunc(t) timeout := time.Millisecond - r := NewPeriodicReader(exp, WithTimeout(timeout)) - r.register(testSDKProducer{}) - r.RegisterProducer(testExternalProducer{ + r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{ produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) { select { case <-time.After(timeout + time.Second): @@ -419,7 +414,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { } return []metricdata.ScopeMetrics{testScopeMetricsA}, nil }, - }) + })) + r.register(testSDKProducer{}) assert.ErrorIs(t, r.Shutdown(context.Background()), context.DeadlineExceeded) assert.False(t, *called, "exporter Export method called when it should have failed before export") }) diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index da68ef33686..8f52bfdeea9 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -57,13 +57,6 @@ type Reader interface { // and send aggregated metric measurements. register(sdkProducer) - // RegisterProducer registers a an external Producer with this Reader. - // The Producer is used as a source of aggregated metric data which is - // incorporated into metrics collected from the SDK. - // - // This method needs to be concurrent safe. - RegisterProducer(Producer) - // temporality reports the Temporality for the instrument kind provided. // // This method needs to be concurrent safe with itself and all the other @@ -176,3 +169,32 @@ func DefaultAggregationSelector(ik InstrumentKind) aggregation.Aggregation { } panic("unknown instrument kind") } + +// ReaderOption is an option which can be applied to manual or Periodic +// readers. +type ReaderOption interface { + PeriodicReaderOption + ManualReaderOption +} + +// WithProducers registers producers as an external Producer of metric data +// for this Reader. +func WithProducer(p Producer) ReaderOption { + return producerOption{p: p} +} + +type producerOption struct { + p Producer +} + +// applyManual returns a manualReaderConfig with option applied. +func (o producerOption) applyManual(c manualReaderConfig) manualReaderConfig { + c.producers = append(c.producers, o.p) + return c +} + +// applyPeriodic returns a periodicReaderConfig with option applied. +func (o producerOption) applyPeriodic(c periodicReaderConfig) periodicReaderConfig { + c.producers = append(c.producers, o.p) + return c +} diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index 8904d68ee40..9d29c8b5f2b 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -34,7 +34,7 @@ import ( type readerTestSuite struct { suite.Suite - Factory func() Reader + Factory func(...ReaderOption) Reader Reader Reader } @@ -42,21 +42,19 @@ func (ts *readerTestSuite) SetupSuite() { otel.SetLogger(testr.New(ts.T())) } -func (ts *readerTestSuite) SetupTest() { - ts.Reader = ts.Factory() -} - func (ts *readerTestSuite) TearDownTest() { // Ensure Reader is allowed attempt to clean up. _ = ts.Reader.Shutdown(context.Background()) } func (ts *readerTestSuite) TestErrorForNotRegistered() { + ts.Reader = ts.Factory() err := ts.Reader.Collect(context.Background(), &metricdata.ResourceMetrics{}) ts.ErrorIs(err, ErrReaderNotRegistered) } func (ts *readerTestSuite) TestSDKProducer() { + ts.Reader = ts.Factory() ts.Reader.register(testSDKProducer{}) m := metricdata.ResourceMetrics{} err := ts.Reader.Collect(context.Background(), &m) @@ -65,8 +63,8 @@ func (ts *readerTestSuite) TestSDKProducer() { } func (ts *readerTestSuite) TestExternalProducer() { + ts.Reader = ts.Factory(WithProducer(testExternalProducer{})) ts.Reader.register(testSDKProducer{}) - ts.Reader.RegisterProducer(testExternalProducer{}) m := metricdata.ResourceMetrics{} err := ts.Reader.Collect(context.Background(), &m) ts.NoError(err) @@ -74,9 +72,9 @@ func (ts *readerTestSuite) TestExternalProducer() { } func (ts *readerTestSuite) TestCollectAfterShutdown() { + ts.Reader = ts.Factory(WithProducer(testExternalProducer{})) ctx := context.Background() ts.Reader.register(testSDKProducer{}) - ts.Reader.RegisterProducer(testExternalProducer{}) ts.Require().NoError(ts.Reader.Shutdown(ctx)) m := metricdata.ResourceMetrics{} @@ -86,22 +84,23 @@ func (ts *readerTestSuite) TestCollectAfterShutdown() { } func (ts *readerTestSuite) TestShutdownTwice() { + ts.Reader = ts.Factory(WithProducer(testExternalProducer{})) ctx := context.Background() ts.Reader.register(testSDKProducer{}) - ts.Reader.RegisterProducer(testExternalProducer{}) ts.Require().NoError(ts.Reader.Shutdown(ctx)) ts.ErrorIs(ts.Reader.Shutdown(ctx), ErrReaderShutdown) } func (ts *readerTestSuite) TestMultipleForceFlush() { + ts.Reader = ts.Factory(WithProducer(testExternalProducer{})) ctx := context.Background() ts.Reader.register(testSDKProducer{}) - ts.Reader.RegisterProducer(testExternalProducer{}) ts.Require().NoError(ts.Reader.ForceFlush(ctx)) ts.NoError(ts.Reader.ForceFlush(ctx)) } func (ts *readerTestSuite) TestMultipleRegister() { + ts.Reader = ts.Factory() p0 := testSDKProducer{ produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error { // Differentiate this producer from the second by returning an @@ -121,21 +120,19 @@ func (ts *readerTestSuite) TestMultipleRegister() { } func (ts *readerTestSuite) TestExternalProducerPartialSuccess() { - ts.Reader.register(testSDKProducer{}) - ts.Reader.RegisterProducer( - testExternalProducer{ + ts.Reader = ts.Factory( + WithProducer(testExternalProducer{ produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) { return []metricdata.ScopeMetrics{}, assert.AnError }, - }, - ) - ts.Reader.RegisterProducer( - testExternalProducer{ + }), + WithProducer(testExternalProducer{ produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) { return []metricdata.ScopeMetrics{testScopeMetricsB}, nil }, - }, + }), ) + ts.Reader.register(testSDKProducer{}) m := metricdata.ResourceMetrics{} err := ts.Reader.Collect(context.Background(), &m) @@ -144,12 +141,12 @@ func (ts *readerTestSuite) TestExternalProducerPartialSuccess() { } func (ts *readerTestSuite) TestSDKFailureBlocksExternalProducer() { + ts.Reader = ts.Factory(WithProducer(testExternalProducer{})) ts.Reader.register(testSDKProducer{ produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error { *rm = metricdata.ResourceMetrics{} return assert.AnError }}) - ts.Reader.RegisterProducer(testExternalProducer{}) m := metricdata.ResourceMetrics{} err := ts.Reader.Collect(context.Background(), &m) @@ -158,11 +155,11 @@ func (ts *readerTestSuite) TestSDKFailureBlocksExternalProducer() { } func (ts *readerTestSuite) TestMethodConcurrentSafe() { + ts.Reader = ts.Factory(WithProducer(testExternalProducer{})) // Requires the race-detector (a default test option for the project). // All reader methods should be concurrent-safe. ts.Reader.register(testSDKProducer{}) - ts.Reader.RegisterProducer(testExternalProducer{}) ctx := context.Background() var wg sync.WaitGroup @@ -202,11 +199,11 @@ func (ts *readerTestSuite) TestMethodConcurrentSafe() { } func (ts *readerTestSuite) TestShutdownBeforeRegister() { + ts.Reader = ts.Factory(WithProducer(testExternalProducer{})) ctx := context.Background() ts.Require().NoError(ts.Reader.Shutdown(ctx)) // Registering after shutdown should not revert the shutdown. ts.Reader.register(testSDKProducer{}) - ts.Reader.RegisterProducer(testExternalProducer{}) m := metricdata.ResourceMetrics{} err := ts.Reader.Collect(ctx, &m) @@ -215,6 +212,7 @@ func (ts *readerTestSuite) TestShutdownBeforeRegister() { } func (ts *readerTestSuite) TestCollectNilResourceMetricError() { + ts.Reader = ts.Factory() ctx := context.Background() ts.Assert().Error(ts.Reader.Collect(ctx, nil)) }