Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update SpanProcessor Shutdown with context and error #1264

Merged
merged 11 commits into from
Oct 27, 2020
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
They no longer track the gRPC codes. (#1214)
- The `StatusCode` field of the `SpanData` struct in the `go.opentelemetry.io/otel/sdk/export/trace` package now uses the codes package from this package instead of the gRPC project. (#1214)
- Move the `go.opentelemetry.io/otel/api/baggage` package into `go.opentelemetry.io/otel/propagators`. (#1217)
- A `Shutdown` method of `SpanProcessor` and all its implementations receives a context and returns an error. (#1264)

### Fixed

Expand Down
5 changes: 3 additions & 2 deletions example/otel-collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ func initProvider() func() {
pusher.Start()

return func() {
handleErr(tracerProvider.Shutdown(context.Background()), "failed to shutdown provider")
handleErr(exp.Shutdown(context.Background()), "failed to stop exporter")
ctx := context.Background()
handleErr(tracerProvider.Shutdown(ctx), "failed to shutdown provider")
handleErr(exp.Shutdown(ctx), "failed to stop exporter")
pusher.Stop() // pushes any last exports to the receiver
}
}
Expand Down
18 changes: 15 additions & 3 deletions sdk/trace/batch_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,23 @@ func (bsp *BatchSpanProcessor) OnEnd(sd *export.SpanData) {

// Shutdown flushes the queue and waits until all spans are processed.
// It only executes once. Subsequent call does nothing.
func (bsp *BatchSpanProcessor) Shutdown() {
func (bsp *BatchSpanProcessor) Shutdown(ctx context.Context) error {
var err error
bsp.stopOnce.Do(func() {
close(bsp.stopCh)
bsp.stopWait.Wait()
wait := make(chan struct{})
go func() {
close(bsp.stopCh)
bsp.stopWait.Wait()
close(wait)
}()
// Wait until the wait group is done or the context is cancelled
select {
case <-wait:
case <-ctx.Done():
err = ctx.Err()
}
})
return err
}

// ForceFlush exports all ended spans that have not yet been exported.
Expand Down
15 changes: 12 additions & 3 deletions sdk/trace/batch_span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ func TestNewBatchSpanProcessorWithNilExporter(t *testing.T) {
bsp.OnStart(&export.SpanData{})
bsp.OnEnd(&export.SpanData{})
bsp.ForceFlush()
bsp.Shutdown()
err := bsp.Shutdown(context.Background())
if err != nil {
t.Error("Error shutting the BatchSpanProcessor down\n")
}
}

type testOption struct {
Expand Down Expand Up @@ -222,8 +225,14 @@ func getSpanContext() otel.SpanContext {
func TestBatchSpanProcessorShutdown(t *testing.T) {
bsp := sdktrace.NewBatchSpanProcessor(&testBatchExporter{})

bsp.Shutdown()
err := bsp.Shutdown(context.Background())
if err != nil {
t.Error("Error shutting the BatchSpanProcessor down\n")
}

// Multiple call to Shutdown() should not panic.
bsp.Shutdown()
err = bsp.Shutdown(context.Background())
if err != nil {
t.Error("Error shutting the BatchSpanProcessor down\n")
}
}
5 changes: 3 additions & 2 deletions sdk/trace/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync/atomic"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/global"
export "go.opentelemetry.io/otel/sdk/export/trace"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/resource"
Expand Down Expand Up @@ -142,7 +143,7 @@ func (p *TracerProvider) UnregisterSpanProcessor(s SpanProcessor) {
}
if stopOnce != nil {
stopOnce.state.Do(func() {
s.Shutdown()
global.Handle(s.Shutdown(context.Background()))
})
}
if len(new) > 1 {
Expand Down Expand Up @@ -190,7 +191,7 @@ func (p *TracerProvider) Shutdown(ctx context.Context) error {

for _, sps := range spss {
sps.state.Do(func() {
sps.sp.Shutdown()
global.Handle(sps.sp.Shutdown(ctx))
})
}
return nil
Expand Down
3 changes: 2 additions & 1 deletion sdk/trace/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ type basicSpanProcesor struct {
running bool
}

func (t *basicSpanProcesor) Shutdown() {
func (t *basicSpanProcesor) Shutdown(context.Context) error {
t.running = false
return nil
}

func (t *basicSpanProcesor) OnStart(s *export.SpanData) {}
Expand Down
3 changes: 2 additions & 1 deletion sdk/trace/simple_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func (ssp *SimpleSpanProcessor) OnEnd(sd *export.SpanData) {
}

// Shutdown method does nothing. There is no data to cleanup.
func (ssp *SimpleSpanProcessor) Shutdown() {
func (ssp *SimpleSpanProcessor) Shutdown(_ context.Context) error {
return nil
}

// ForceFlush does nothing as there is no data to flush.
Expand Down
6 changes: 5 additions & 1 deletion sdk/trace/simple_span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ func TestSimpleSpanProcessorShutdown(t *testing.T) {
ssp := sdktrace.NewSimpleSpanProcessor(&testExporter{})
if ssp == nil {
t.Errorf("Error creating new instance of SimpleSpanProcessor\n")
return
}

ssp.Shutdown()
err := ssp.Shutdown(context.Background())
if err != nil {
t.Error("Error shutting the SimpleSpanProcessor down\n")
}
}
5 changes: 3 additions & 2 deletions sdk/trace/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package trace

import (
"context"
"sync"

export "go.opentelemetry.io/otel/sdk/export/trace"
Expand All @@ -31,10 +32,10 @@ type SpanProcessor interface {
// and hence should not block.
OnEnd(sd *export.SpanData)

// Shutdown is invoked when SDK shutsdown. Use this call to cleanup any processor
// Shutdown is invoked when SDK shuts down. Use this call to cleanup any processor
// data. No calls to OnStart and OnEnd method is invoked after Shutdown call is
// made. It should not be blocked indefinitely.
Shutdown()
Shutdown(ctx context.Context) error

// ForceFlush exports all ended spans to the configured Exporter that have not yet
// been exported. It should only be called when absolutely necessary, such as when
Expand Down
13 changes: 7 additions & 6 deletions sdk/trace/span_processor_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package trace

import (
"context"
"time"

export "go.opentelemetry.io/otel/sdk/export/trace"
Expand All @@ -33,9 +34,9 @@ type DurationFilter struct {
Max time.Duration
}

func (f DurationFilter) OnStart(sd *export.SpanData) { f.Next.OnStart(sd) }
func (f DurationFilter) Shutdown() { f.Next.Shutdown() }
func (f DurationFilter) ForceFlush() { f.Next.ForceFlush() }
func (f DurationFilter) OnStart(sd *export.SpanData) { f.Next.OnStart(sd) }
func (f DurationFilter) Shutdown(ctx context.Context) error { return f.Next.Shutdown(ctx) }
func (f DurationFilter) ForceFlush() { f.Next.ForceFlush() }
func (f DurationFilter) OnEnd(sd *export.SpanData) {
if f.Min > 0 && sd.EndTime.Sub(sd.StartTime) < f.Min {
// Drop short lived spans.
Expand All @@ -59,9 +60,9 @@ type InstrumentationBlacklist struct {
Blacklist map[string]bool
}

func (f InstrumentationBlacklist) OnStart(sd *export.SpanData) { f.Next.OnStart(sd) }
func (f InstrumentationBlacklist) Shutdown() { f.Next.Shutdown() }
func (f InstrumentationBlacklist) ForceFlush() { f.Next.ForceFlush() }
func (f InstrumentationBlacklist) OnStart(sd *export.SpanData) { f.Next.OnStart(sd) }
func (f InstrumentationBlacklist) Shutdown(ctx context.Context) error { return f.Next.Shutdown(ctx) }
func (f InstrumentationBlacklist) ForceFlush() { f.Next.ForceFlush() }
func (f InstrumentationBlacklist) OnEnd(sd *export.SpanData) {
if f.Blacklist != nil && f.Blacklist[sd.InstrumentationLibrary.Name] {
// Drop spans from this instrumentation
Expand Down
24 changes: 14 additions & 10 deletions sdk/trace/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import (
export "go.opentelemetry.io/otel/sdk/export/trace"
)

type testSpanProcesor struct {
type testSpanProcessor struct {
name string
spansStarted []*export.SpanData
spansEnded []*export.SpanData
shutdownCount int
}

func (t *testSpanProcesor) OnStart(s *export.SpanData) {
func (t *testSpanProcessor) OnStart(s *export.SpanData) {
kv := label.KeyValue{
Key: "OnStart",
Value: label.StringValue(t.name),
Expand All @@ -38,7 +38,7 @@ func (t *testSpanProcesor) OnStart(s *export.SpanData) {
t.spansStarted = append(t.spansStarted, s)
}

func (t *testSpanProcesor) OnEnd(s *export.SpanData) {
func (t *testSpanProcessor) OnEnd(s *export.SpanData) {
kv := label.KeyValue{
Key: "OnEnd",
Value: label.StringValue(t.name),
Expand All @@ -47,11 +47,12 @@ func (t *testSpanProcesor) OnEnd(s *export.SpanData) {
t.spansEnded = append(t.spansEnded, s)
}

func (t *testSpanProcesor) Shutdown() {
func (t *testSpanProcessor) Shutdown(_ context.Context) error {
t.shutdownCount++
return nil
}

func (t *testSpanProcesor) ForceFlush() {
func (t *testSpanProcessor) ForceFlush() {
}

func TestRegisterSpanProcessort(t *testing.T) {
Expand Down Expand Up @@ -181,7 +182,10 @@ func TestSpanProcessorShutdown(t *testing.T) {
tp.RegisterSpanProcessor(sp)

wantCount := 1
sp.Shutdown()
err := sp.Shutdown(context.Background())
if err != nil {
t.Error("Error shutting the testSpanProcessor down\n")
}

gotCount := sp.shutdownCount
if wantCount != gotCount {
Expand Down Expand Up @@ -216,12 +220,12 @@ func TestMultipleUnregisterSpanProcessorCalls(t *testing.T) {
}
}

func NewTestSpanProcessor(name string) *testSpanProcesor {
return &testSpanProcesor{name: name}
func NewTestSpanProcessor(name string) *testSpanProcessor {
return &testSpanProcessor{name: name}
}

func NewNamedTestSpanProcessors(names []string) []*testSpanProcesor {
tsp := []*testSpanProcesor{}
func NewNamedTestSpanProcessors(names []string) []*testSpanProcessor {
tsp := []*testSpanProcessor{}
for _, n := range names {
tsp = append(tsp, NewTestSpanProcessor(n))
}
Expand Down