Skip to content

Commit

Permalink
PeriodicReader.Shutdown now applies the periodic reader's timeout by …
Browse files Browse the repository at this point in the history
…default
  • Loading branch information
dashpole committed Jul 24, 2023
1 parent fd5284f commit 5be73a9
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- If an attribute set is Observed multiple times in an async callback, the values will be summed instead of the last observation winning. (#4289)
- Allow the explicit bucket histogram aggregation to be used for the up-down counter, observable counter, observable up-down counter, and observable gauge in the `go.opentelemetry.io/otel/sdk/metric` package. (#4332)
- Restrict `Meter`s in `go.opentelemetry.io/otel/sdk/metric` to only register and collect instruments it created. (#4333)
- `PeriodicReader.Shutdown` in `go.opentelemetry.io/otel/sdk/metric` now applies the periodic reader's timeout by default. (#TODO)

### Fixed

Expand Down
2 changes: 2 additions & 0 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ func (r *PeriodicReader) ForceFlush(ctx context.Context) error {
func (r *PeriodicReader) Shutdown(ctx context.Context) error {
err := ErrReaderShutdown
r.shutdownOnce.Do(func() {
ctx, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()
// Stop the run loop.
r.cancel()
<-r.done
Expand Down
40 changes: 40 additions & 0 deletions sdk/metric/periodic_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,46 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
})

t.Run("Shutdown timeout on producer", func(t *testing.T) {
exp, called := expFunc(t)
timeout := time.Millisecond
r := NewPeriodicReader(exp, WithTimeout(timeout))
r.register(testSDKProducer{
produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error {
select {
case <-time.After(timeout + time.Millisecond):
*rm = testResourceMetricsA
case <-ctx.Done():
// we timed out before we could collect metrics
return ctx.Err()
}
return nil
}})
r.RegisterProducer(testExternalProducer{})
assert.Equal(t, context.DeadlineExceeded, r.Shutdown(context.Background()), "timeout error not returned")
assert.False(t, *called, "exporter Export method called when it should have failed before export")
})

t.Run("Shutdown timeout on external producer", func(t *testing.T) {
exp, called := expFunc(t)
timeout := time.Millisecond
r := NewPeriodicReader(exp, WithTimeout(timeout))
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{
produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) {
select {
case <-time.After(timeout + time.Millisecond):
case <-ctx.Done():
// we timed out before we could collect metrics
return nil, ctx.Err()
}
return []metricdata.ScopeMetrics{testScopeMetricsA}, nil
},
})
assert.Equal(t, context.DeadlineExceeded, r.Shutdown(context.Background()), "timeout error not returned")
assert.False(t, *called, "exporter Export method called when it should have failed before export")
})
}

func BenchmarkPeriodicReader(b *testing.B) {
Expand Down

0 comments on commit 5be73a9

Please sign in to comment.