Skip to content

Commit

Permalink
Change metric.Producer to be an Option on Reader (#4346)
Browse files Browse the repository at this point in the history
* metric.Producer can be passed as an argument to Reader using
WithProducer

* reproduce data race

* revert to atomic.Value
  • Loading branch information
dashpole authored Aug 11, 2023
1 parent 7b9fb7a commit fe51391
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 92 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `PeriodicReader.Shutdown` and `PeriodicReader.ForceFlush` in `go.opentelemetry.io/otel/sdk/metric` now apply the periodic reader's timeout to the operation if the user provided context does not contain a deadline. (#4356, #4377)
- Upgrade all use of `go.opentelemetry.io/otel/semconv` to use `v1.21.0`. (#4408)
- Increase instrument name maximum length from 63 to 255 characters. (#4434)
- Add `go.opentelemetry.op/otel/sdk/metric.WithProducer` as an Option for metric.NewManualReader and metric.NewPeriodicReader, and remove `Reader.RegisterProducer()` (#4346)

### Removed

Expand Down
3 changes: 1 addition & 2 deletions example/opencensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,8 @@ func tracing(otExporter sdktrace.SpanExporter) {
// registry or an OpenCensus view.
func monitoring(exporter metric.Exporter) error {
log.Println("Adding the OpenCensus metric Producer to an OpenTelemetry Reader to export OpenCensus metrics using the OpenTelemetry stdout exporter.")
reader := metric.NewPeriodicReader(exporter)
// Register the OpenCensus metric Producer to add metrics from OpenCensus to the output.
reader.RegisterProducer(opencensus.NewMetricProducer())
reader := metric.NewPeriodicReader(exporter, metric.WithProducer(opencensus.NewMetricProducer()))
metric.NewMeterProvider(metric.WithReader(reader))

log.Println("Registering a gauge metric using an OpenCensus registry.")
Expand Down
20 changes: 2 additions & 18 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewManualReader(opts ...ManualReaderOption) *ManualReader {
temporalitySelector: cfg.temporalitySelector,
aggregationSelector: cfg.aggregationSelector,
}
r.externalProducers.Store([]Producer{})
r.externalProducers.Store(cfg.producers)
return r
}

Expand All @@ -64,23 +64,6 @@ func (mr *ManualReader) register(p sdkProducer) {
}
}

// RegisterProducer stores the external Producer which enables the caller
// to read metrics on demand.
//
// This method is safe to call concurrently.
func (mr *ManualReader) RegisterProducer(p Producer) {
mr.mu.Lock()
defer mr.mu.Unlock()
if mr.isShutdown {
return
}
currentProducers := mr.externalProducers.Load().([]Producer)
newProducers := []Producer{}
newProducers = append(newProducers, currentProducers...)
newProducers = append(newProducers, p)
mr.externalProducers.Store(newProducers)
}

// temporality reports the Temporality for the instrument kind provided.
func (mr *ManualReader) temporality(kind InstrumentKind) metricdata.Temporality {
return mr.temporalitySelector(kind)
Expand Down Expand Up @@ -176,6 +159,7 @@ func (r *ManualReader) MarshalLog() interface{} {
type manualReaderConfig struct {
temporalitySelector TemporalitySelector
aggregationSelector AggregationSelector
producers []Producer
}

// newManualReaderConfig returns a manualReaderConfig configured with options.
Expand Down
8 changes: 7 additions & 1 deletion sdk/metric/manual_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ import (
)

func TestManualReader(t *testing.T) {
suite.Run(t, &readerTestSuite{Factory: func() Reader { return NewManualReader() }})
suite.Run(t, &readerTestSuite{Factory: func(opts ...ReaderOption) Reader {
var mopts []ManualReaderOption
for _, o := range opts {
mopts = append(mopts, o)
}
return NewManualReader(mopts...)
}})
}

func BenchmarkManualReader(b *testing.B) {
Expand Down
23 changes: 4 additions & 19 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ const (

// periodicReaderConfig contains configuration options for a PeriodicReader.
type periodicReaderConfig struct {
interval time.Duration
timeout time.Duration
interval time.Duration
timeout time.Duration
producers []Producer
}

// newPeriodicReaderConfig returns a periodicReaderConfig configured with
Expand Down Expand Up @@ -129,7 +130,7 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *Peri
return &metricdata.ResourceMetrics{}
}},
}
r.externalProducers.Store([]Producer{})
r.externalProducers.Store(conf.producers)

go func() {
defer func() { close(r.done) }()
Expand Down Expand Up @@ -197,22 +198,6 @@ func (r *PeriodicReader) register(p sdkProducer) {
}
}

// RegisterProducer registers p as an external Producer of this reader.
//
// This method is safe to call concurrently.
func (r *PeriodicReader) RegisterProducer(p Producer) {
r.mu.Lock()
defer r.mu.Unlock()
if r.isShutdown {
return
}
currentProducers := r.externalProducers.Load().([]Producer)
newProducers := []Producer{}
newProducers = append(newProducers, currentProducers...)
newProducers = append(newProducers, p)
r.externalProducers.Store(newProducers)
}

// temporality reports the Temporality for the instrument kind provided.
func (r *PeriodicReader) temporality(kind InstrumentKind) metricdata.Temporality {
return r.exporter.Temporality(kind)
Expand Down
45 changes: 20 additions & 25 deletions sdk/metric/periodic_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,17 +203,16 @@ type periodicReaderTestSuite struct {
}

func (ts *periodicReaderTestSuite) SetupTest() {
ts.readerTestSuite.SetupTest()
ts.Reader = ts.Factory()

e := &fnExporter{
exportFunc: func(context.Context, *metricdata.ResourceMetrics) error { return assert.AnError },
flushFunc: func(context.Context) error { return assert.AnError },
shutdownFunc: func(context.Context) error { return assert.AnError },
}

ts.ErrReader = NewPeriodicReader(e)
ts.ErrReader = NewPeriodicReader(e, WithProducer(testExternalProducer{}))
ts.ErrReader.register(testSDKProducer{})
ts.ErrReader.RegisterProducer(testExternalProducer{})
}

func (ts *periodicReaderTestSuite) TearDownTest() {
Expand All @@ -233,8 +232,12 @@ func (ts *periodicReaderTestSuite) TestShutdownPropagated() {
func TestPeriodicReader(t *testing.T) {
suite.Run(t, &periodicReaderTestSuite{
readerTestSuite: &readerTestSuite{
Factory: func() Reader {
return NewPeriodicReader(new(fnExporter))
Factory: func(opts ...ReaderOption) Reader {
var popts []PeriodicReaderOption
for _, o := range opts {
popts = append(popts, o)
}
return NewPeriodicReader(new(fnExporter), popts...)
},
},
})
Expand Down Expand Up @@ -291,9 +294,8 @@ func TestPeriodicReaderRun(t *testing.T) {
},
}

r := NewPeriodicReader(exp)
r := NewPeriodicReader(exp, WithProducer(testExternalProducer{}))
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{})
trigger <- time.Now()
assert.Equal(t, assert.AnError, <-eh.Err)

Expand All @@ -320,9 +322,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {

t.Run("ForceFlush", func(t *testing.T) {
exp, called := expFunc(t)
r := NewPeriodicReader(exp)
r := NewPeriodicReader(exp, WithProducer(testExternalProducer{}))
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{})
assert.Equal(t, assert.AnError, r.ForceFlush(context.Background()), "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")

Expand All @@ -333,7 +334,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
t.Run("ForceFlush timeout on producer", func(t *testing.T) {
exp, called := expFunc(t)
timeout := time.Millisecond
r := NewPeriodicReader(exp, WithTimeout(timeout))
r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{}))
r.register(testSDKProducer{
produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error {
select {
Expand All @@ -345,7 +346,6 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
}
return nil
}})
r.RegisterProducer(testExternalProducer{})
assert.ErrorIs(t, r.ForceFlush(context.Background()), context.DeadlineExceeded)
assert.False(t, *called, "exporter Export method called when it should have failed before export")

Expand All @@ -356,9 +356,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
t.Run("ForceFlush timeout on external producer", func(t *testing.T) {
exp, called := expFunc(t)
timeout := time.Millisecond
r := NewPeriodicReader(exp, WithTimeout(timeout))
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{
r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{
produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) {
select {
case <-time.After(timeout + time.Second):
Expand All @@ -368,7 +366,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
}
return []metricdata.ScopeMetrics{testScopeMetricsA}, nil
},
})
}))
r.register(testSDKProducer{})
assert.ErrorIs(t, r.ForceFlush(context.Background()), context.DeadlineExceeded)
assert.False(t, *called, "exporter Export method called when it should have failed before export")

Expand All @@ -378,17 +377,16 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {

t.Run("Shutdown", func(t *testing.T) {
exp, called := expFunc(t)
r := NewPeriodicReader(exp)
r := NewPeriodicReader(exp, WithProducer(testExternalProducer{}))
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{})
assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
})

t.Run("Shutdown timeout on producer", func(t *testing.T) {
exp, called := expFunc(t)
timeout := time.Millisecond
r := NewPeriodicReader(exp, WithTimeout(timeout))
r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{}))
r.register(testSDKProducer{
produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error {
select {
Expand All @@ -400,17 +398,14 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
}
return nil
}})
r.RegisterProducer(testExternalProducer{})
assert.ErrorIs(t, r.Shutdown(context.Background()), context.DeadlineExceeded)
assert.False(t, *called, "exporter Export method called when it should have failed before export")
})

t.Run("Shutdown timeout on external producer", func(t *testing.T) {
exp, called := expFunc(t)
timeout := time.Millisecond
r := NewPeriodicReader(exp, WithTimeout(timeout))
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{
r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{
produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) {
select {
case <-time.After(timeout + time.Second):
Expand All @@ -420,17 +415,17 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
}
return []metricdata.ScopeMetrics{testScopeMetricsA}, nil
},
})
}))
r.register(testSDKProducer{})
assert.ErrorIs(t, r.Shutdown(context.Background()), context.DeadlineExceeded)
assert.False(t, *called, "exporter Export method called when it should have failed before export")
})
}

func TestPeriodicReaderMultipleForceFlush(t *testing.T) {
ctx := context.Background()
r := NewPeriodicReader(new(fnExporter))
r := NewPeriodicReader(new(fnExporter), WithProducer(testExternalProducer{}))
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{})
require.NoError(t, r.ForceFlush(ctx))
require.NoError(t, r.ForceFlush(ctx))
}
Expand Down
36 changes: 29 additions & 7 deletions sdk/metric/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,6 @@ type Reader interface {
// and send aggregated metric measurements.
register(sdkProducer)

// RegisterProducer registers a an external Producer with this Reader.
// The Producer is used as a source of aggregated metric data which is
// incorporated into metrics collected from the SDK.
//
// This method needs to be concurrent safe.
RegisterProducer(Producer)

// temporality reports the Temporality for the instrument kind provided.
//
// This method needs to be concurrent safe with itself and all the other
Expand Down Expand Up @@ -166,3 +159,32 @@ func DefaultAggregationSelector(ik InstrumentKind) aggregation.Aggregation {
}
panic("unknown instrument kind")
}

// ReaderOption is an option which can be applied to manual or Periodic
// readers.
type ReaderOption interface {
PeriodicReaderOption
ManualReaderOption
}

// WithProducers registers producers as an external Producer of metric data
// for this Reader.
func WithProducer(p Producer) ReaderOption {
return producerOption{p: p}
}

type producerOption struct {
p Producer
}

// applyManual returns a manualReaderConfig with option applied.
func (o producerOption) applyManual(c manualReaderConfig) manualReaderConfig {
c.producers = append(c.producers, o.p)
return c
}

// applyPeriodic returns a periodicReaderConfig with option applied.
func (o producerOption) applyPeriodic(c periodicReaderConfig) periodicReaderConfig {
c.producers = append(c.producers, o.p)
return c
}
Loading

0 comments on commit fe51391

Please sign in to comment.