Skip to content

Commit

Permalink
Modify ForceFlush to abort after timeout/cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
humivo committed Mar 30, 2021
1 parent c61f4b6 commit a5db348
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Jaeger exporter was updated to use thrift v0.14.1. (#1712)
- Migrate from using internally built and maintained version of the OTLP to the one hosted at `go.opentelemetry.io/proto/otlp`. (#1713)
- Migrate from using `github.com/gogo/protobuf` to `google.golang.org/protobuf` to match `go.opentelemetry.io/proto/otlp`. (#1713)
- Modify `BatchSpanProcessor.ForceFlush` to abort after timeout/cancellation (#1618)

### Removed

Expand Down
18 changes: 17 additions & 1 deletion sdk/trace/batch_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,23 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {

// ForceFlush exports all ended spans that have not yet been exported.
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
return bsp.exportSpans(ctx)
var err error
if bsp.e != nil {
wait := make(chan struct{})
go func() {
if err := bsp.exportSpans(ctx); err != nil {
otel.Handle(err)
}
close(wait)
}()
// Wait until the wait group is done or the context is cancelled/timed out
select {
case <-wait:
case <-ctx.Done():
err = ctx.Err()
}
}
return err
}

func WithMaxQueueSize(size int) BatchSpanProcessorOption {
Expand Down
30 changes: 30 additions & 0 deletions sdk/trace/batch_span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,33 @@ func TestBatchSpanProcessorShutdown(t *testing.T) {
}
assert.Equal(t, 1, bp.shutdownCount)
}

func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) {
var bp testBatchExporter
bsp := sdktrace.NewBatchSpanProcessor(&bp)

err := bsp.ForceFlush(context.Background())
assert.Equal(t, nil, err)
}

func TestBatchSpanProcessorForceFlushTimeout(t *testing.T) {
var bp testBatchExporter
bsp := sdktrace.NewBatchSpanProcessor(&bp)
// Add timeout to context to test deadline
ctx, cancel := context.WithTimeout(context.Background(), 0)
defer cancel()

err := bsp.ForceFlush(ctx)
assert.Equal(t, context.DeadlineExceeded, err)
}

func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) {
var bp testBatchExporter
bsp := sdktrace.NewBatchSpanProcessor(&bp)
ctx, cancel := context.WithCancel(context.Background())
// Cancel the context
cancel()

err := bsp.ForceFlush(ctx)
assert.Equal(t, context.Canceled, err)
}

0 comments on commit a5db348

Please sign in to comment.