Skip to content

Commit

Permalink
prototype for metric.Producer as an argument to Reader
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Jul 20, 2023
1 parent e08359f commit 1740d9c
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 90 deletions.
3 changes: 1 addition & 2 deletions example/opencensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,8 @@ func tracing(otExporter sdktrace.SpanExporter) {
// registry or an OpenCensus view.
func monitoring(exporter metric.Exporter) error {
log.Println("Adding the OpenCensus metric Producer to an OpenTelemetry Reader to export OpenCensus metrics using the OpenTelemetry stdout exporter.")
reader := metric.NewPeriodicReader(exporter)
// Register the OpenCensus metric Producer to add metrics from OpenCensus to the output.
reader.RegisterProducer(opencensus.NewMetricProducer())
reader := metric.NewPeriodicReader(exporter, metric.WithProducer(opencensus.NewMetricProducer()))
metric.NewMeterProvider(metric.WithReader(reader))

log.Println("Registering a gauge metric using an OpenCensus registry.")
Expand Down
26 changes: 5 additions & 21 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type ManualReader struct {

mu sync.Mutex
isShutdown bool
externalProducers atomic.Value
externalProducers []Producer

temporalitySelector TemporalitySelector
aggregationSelector AggregationSelector
Expand All @@ -49,8 +49,8 @@ func NewManualReader(opts ...ManualReaderOption) *ManualReader {
r := &ManualReader{
temporalitySelector: cfg.temporalitySelector,
aggregationSelector: cfg.aggregationSelector,
externalProducers: cfg.producers,
}
r.externalProducers.Store([]Producer{})
return r
}

Expand All @@ -64,23 +64,6 @@ func (mr *ManualReader) register(p sdkProducer) {
}
}

// RegisterProducer stores the external Producer which enables the caller
// to read metrics on demand.
//
// This method is safe to call concurrently.
func (mr *ManualReader) RegisterProducer(p Producer) {
mr.mu.Lock()
defer mr.mu.Unlock()
if mr.isShutdown {
return
}
currentProducers := mr.externalProducers.Load().([]Producer)
newProducers := []Producer{}
newProducers = append(newProducers, currentProducers...)
newProducers = append(newProducers, p)
mr.externalProducers.Store(newProducers)
}

// temporality reports the Temporality for the instrument kind provided.
func (mr *ManualReader) temporality(kind InstrumentKind) metricdata.Temporality {
return mr.temporalitySelector(kind)
Expand Down Expand Up @@ -112,7 +95,7 @@ func (mr *ManualReader) Shutdown(context.Context) error {
defer mr.mu.Unlock()
mr.isShutdown = true
// release references to Producer(s)
mr.externalProducers.Store([]Producer{})
mr.externalProducers = nil
err = nil
})
return err
Expand Down Expand Up @@ -150,7 +133,7 @@ func (mr *ManualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetr
return err
}
var errs []error
for _, producer := range mr.externalProducers.Load().([]Producer) {
for _, producer := range mr.externalProducers {
externalMetrics, err := producer.Produce(ctx)
if err != nil {
errs = append(errs, err)
Expand Down Expand Up @@ -183,6 +166,7 @@ func (r *ManualReader) MarshalLog() interface{} {
type manualReaderConfig struct {
temporalitySelector TemporalitySelector
aggregationSelector AggregationSelector
producers []Producer
}

// newManualReaderConfig returns a manualReaderConfig configured with options.
Expand Down
8 changes: 7 additions & 1 deletion sdk/metric/manual_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ import (
)

func TestManualReader(t *testing.T) {
suite.Run(t, &readerTestSuite{Factory: func() Reader { return NewManualReader() }})
suite.Run(t, &readerTestSuite{Factory: func(opts ...ReaderOption) Reader {
var mopts []ManualReaderOption
for _, o := range opts {
mopts = append(mopts, o)
}
return NewManualReader(mopts...)
}})
}

func BenchmarkManualReader(b *testing.B) {
Expand Down
41 changes: 13 additions & 28 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ const (

// periodicReaderConfig contains configuration options for a PeriodicReader.
type periodicReaderConfig struct {
interval time.Duration
timeout time.Duration
interval time.Duration
timeout time.Duration
producers []Producer
}

// newPeriodicReaderConfig returns a periodicReaderConfig configured with
Expand Down Expand Up @@ -115,18 +116,18 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *Peri
conf := newPeriodicReaderConfig(options)
ctx, cancel := context.WithCancel(context.Background())
r := &PeriodicReader{
interval: conf.interval,
timeout: conf.timeout,
exporter: exporter,
flushCh: make(chan chan error),
cancel: cancel,
done: make(chan struct{}),
interval: conf.interval,
timeout: conf.timeout,
exporter: exporter,
flushCh: make(chan chan error),
cancel: cancel,
done: make(chan struct{}),
externalProducers: conf.producers,
rmPool: sync.Pool{
New: func() interface{} {
return &metricdata.ResourceMetrics{}
}},
}
r.externalProducers.Store([]Producer{})

go func() {
defer func() { close(r.done) }()
Expand All @@ -143,7 +144,7 @@ type PeriodicReader struct {

mu sync.Mutex
isShutdown bool
externalProducers atomic.Value
externalProducers []Producer

interval time.Duration
timeout time.Duration
Expand Down Expand Up @@ -194,22 +195,6 @@ func (r *PeriodicReader) register(p sdkProducer) {
}
}

// RegisterProducer registers p as an external Producer of this reader.
//
// This method is safe to call concurrently.
func (r *PeriodicReader) RegisterProducer(p Producer) {
r.mu.Lock()
defer r.mu.Unlock()
if r.isShutdown {
return
}
currentProducers := r.externalProducers.Load().([]Producer)
newProducers := []Producer{}
newProducers = append(newProducers, currentProducers...)
newProducers = append(newProducers, p)
r.externalProducers.Store(newProducers)
}

// temporality reports the Temporality for the instrument kind provided.
func (r *PeriodicReader) temporality(kind InstrumentKind) metricdata.Temporality {
return r.exporter.Temporality(kind)
Expand Down Expand Up @@ -275,7 +260,7 @@ func (r *PeriodicReader) collect(ctx context.Context, p interface{}, rm *metricd
return err
}
var errs []error
for _, producer := range r.externalProducers.Load().([]Producer) {
for _, producer := range r.externalProducers {
externalMetrics, err := producer.Produce(ctx)
if err != nil {
errs = append(errs, err)
Expand Down Expand Up @@ -351,7 +336,7 @@ func (r *PeriodicReader) Shutdown(ctx context.Context) error {
defer r.mu.Unlock()
r.isShutdown = true
// release references to Producer(s)
r.externalProducers.Store([]Producer{})
r.externalProducers = nil
})
return err
}
Expand Down
22 changes: 11 additions & 11 deletions sdk/metric/periodic_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,16 @@ type periodicReaderTestSuite struct {
}

func (ts *periodicReaderTestSuite) SetupTest() {
ts.readerTestSuite.SetupTest()
ts.Reader = ts.Factory()

e := &fnExporter{
exportFunc: func(context.Context, *metricdata.ResourceMetrics) error { return assert.AnError },
flushFunc: func(context.Context) error { return assert.AnError },
shutdownFunc: func(context.Context) error { return assert.AnError },
}

ts.ErrReader = NewPeriodicReader(e)
ts.ErrReader = NewPeriodicReader(e, WithProducer(testExternalProducer{}))
ts.ErrReader.register(testSDKProducer{})
ts.ErrReader.RegisterProducer(testExternalProducer{})
}

func (ts *periodicReaderTestSuite) TearDownTest() {
Expand All @@ -232,8 +231,12 @@ func (ts *periodicReaderTestSuite) TestShutdownPropagated() {
func TestPeriodicReader(t *testing.T) {
suite.Run(t, &periodicReaderTestSuite{
readerTestSuite: &readerTestSuite{
Factory: func() Reader {
return NewPeriodicReader(new(fnExporter))
Factory: func(opts ...ReaderOption) Reader {
var popts []PeriodicReaderOption
for _, o := range opts {
popts = append(popts, o)
}
return NewPeriodicReader(new(fnExporter), popts...)
},
},
})
Expand Down Expand Up @@ -290,9 +293,8 @@ func TestPeriodicReaderRun(t *testing.T) {
},
}

r := NewPeriodicReader(exp)
r := NewPeriodicReader(exp, WithProducer(testExternalProducer{}))
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{})
trigger <- time.Now()
assert.Equal(t, assert.AnError, <-eh.Err)

Expand All @@ -319,9 +321,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {

t.Run("ForceFlush", func(t *testing.T) {
exp, called := expFunc(t)
r := NewPeriodicReader(exp)
r := NewPeriodicReader(exp, WithProducer(testExternalProducer{}))
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{})
assert.Equal(t, assert.AnError, r.ForceFlush(context.Background()), "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")

Expand All @@ -331,9 +332,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {

t.Run("Shutdown", func(t *testing.T) {
exp, called := expFunc(t)
r := NewPeriodicReader(exp)
r := NewPeriodicReader(exp, WithProducer(testExternalProducer{}))
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{})
assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
})
Expand Down
36 changes: 29 additions & 7 deletions sdk/metric/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,6 @@ type Reader interface {
// and send aggregated metric measurements.
register(sdkProducer)

// RegisterProducer registers a an external Producer with this Reader.
// The Producer is used as a source of aggregated metric data which is
// incorporated into metrics collected from the SDK.
//
// This method needs to be concurrent safe.
RegisterProducer(Producer)

// temporality reports the Temporality for the instrument kind provided.
temporality(InstrumentKind) metricdata.Temporality

Expand Down Expand Up @@ -167,3 +160,32 @@ func DefaultAggregationSelector(ik InstrumentKind) aggregation.Aggregation {
}
panic("unknown instrument kind")
}

// ReaderOption is an option which can be applied to manual or Periodic
// readers.
type ReaderOption interface {
PeriodicReaderOption
ManualReaderOption
}

// WithProducers registers producers as an external Producer of metric data
// for this Reader.
func WithProducer(p Producer) ReaderOption {
return producerOption{p: p}
}

type producerOption struct {
p Producer
}

// applyManual returns a manualReaderConfig with option applied.
func (o producerOption) applyManual(c manualReaderConfig) manualReaderConfig {
c.producers = append(c.producers, o.p)
return c
}

// applyPeriodic returns a periodicReaderConfig with option applied.
func (o producerOption) applyPeriodic(c periodicReaderConfig) periodicReaderConfig {
c.producers = append(c.producers, o.p)
return c
}
Loading

0 comments on commit 1740d9c

Please sign in to comment.