Skip to content

Commit

Permalink
Add ExportTimeout option to batch span processor (#1755)
Browse files Browse the repository at this point in the history
* Add ExportTimeout option

* Adjust tests

* Update CHANGELOG

* Beef up the exporter timeout test

* Beef up exporter test - attempt #2

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
  • Loading branch information
matej-g and MrAlias committed Apr 5, 2021
1 parent c6b92d5 commit 3c7face
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

### Added

- Option `ExportTimeout` was added to batch span processor. (#1755)
- Adds semantic conventions for exceptions. (#1492)
- Added support for configuring OTLP/HTTP Endpoints, Headers, Compression and Timeout via the Environment Variables. (#1758)
- `OTEL_EXPORTER_OTLP_ENDPOINT`
Expand Down
19 changes: 19 additions & 0 deletions sdk/trace/batch_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
const (
DefaultMaxQueueSize = 2048
DefaultBatchTimeout = 5000 * time.Millisecond
DefaultExportTimeout = 30000 * time.Millisecond
DefaultMaxExportBatchSize = 512
)

Expand All @@ -44,6 +45,11 @@ type BatchSpanProcessorOptions struct {
// The default value of BatchTimeout is 5000 msec.
BatchTimeout time.Duration

// ExportTimeout specifies the maximum duration for exporting spans. If the timeout
// is reached, the export will be cancelled.
// The default value of ExportTimeout is 30000 msec.
ExportTimeout time.Duration

// MaxExportBatchSize is the maximum number of spans to process in a single batch.
// If there are more than one batch worth of spans then it processes multiple batches
// of spans one batch after the other without any delay.
Expand Down Expand Up @@ -83,6 +89,7 @@ var _ SpanProcessor = (*batchSpanProcessor)(nil)
func NewBatchSpanProcessor(exporter export.SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor {
o := BatchSpanProcessorOptions{
BatchTimeout: DefaultBatchTimeout,
ExportTimeout: DefaultExportTimeout,
MaxQueueSize: DefaultMaxQueueSize,
MaxExportBatchSize: DefaultMaxExportBatchSize,
}
Expand Down Expand Up @@ -185,6 +192,12 @@ func WithBatchTimeout(delay time.Duration) BatchSpanProcessorOption {
}
}

func WithExportTimeout(timeout time.Duration) BatchSpanProcessorOption {
return func(o *BatchSpanProcessorOptions) {
o.ExportTimeout = timeout
}
}

func WithBlocking() BatchSpanProcessorOption {
return func(o *BatchSpanProcessorOptions) {
o.BlockOnQueueFull = true
Expand All @@ -198,6 +211,12 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
bsp.batchMutex.Lock()
defer bsp.batchMutex.Unlock()

if bsp.o.ExportTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, bsp.o.ExportTimeout)
defer cancel()
}

if len(bsp.batch) > 0 {
if err := bsp.e.ExportSpans(ctx, bsp.batch); err != nil {
return err
Expand Down
48 changes: 39 additions & 9 deletions sdk/trace/batch_span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,23 @@ type testBatchExporter struct {
sizes []int
batchCount int
shutdownCount int
delay time.Duration
err error
}

func (t *testBatchExporter) ExportSpans(ctx context.Context, ss []*export.SpanSnapshot) error {
t.mu.Lock()
defer t.mu.Unlock()

time.Sleep(t.delay)

select {
case <-ctx.Done():
t.err = ctx.Err()
return ctx.Err()
default:
}

t.spans = append(t.spans, ss...)
t.sizes = append(t.sizes, len(ss))
t.batchCount++
Expand Down Expand Up @@ -88,23 +99,35 @@ func TestNewBatchSpanProcessorWithNilExporter(t *testing.T) {
}

type testOption struct {
name string
o []sdktrace.BatchSpanProcessorOption
wantNumSpans int
wantBatchCount int
genNumSpans int
parallel bool
name string
o []sdktrace.BatchSpanProcessorOption
wantNumSpans int
wantBatchCount int
wantExportTimeout bool
genNumSpans int
delayExportBy time.Duration
parallel bool
}

func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
schDelay := 200 * time.Millisecond
exportTimeout := time.Millisecond
options := []testOption{
{
name: "default BatchSpanProcessorOptions",
wantNumSpans: 2053,
wantBatchCount: 4,
genNumSpans: 2053,
},
{
name: "non-default ExportTimeout",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithExportTimeout(exportTimeout),
},
wantExportTimeout: true,
genNumSpans: 2053,
delayExportBy: 2 * exportTimeout, // to ensure export timeout
},
{
name: "non-default BatchTimeout",
o: []sdktrace.BatchSpanProcessorOption{
Expand Down Expand Up @@ -171,7 +194,9 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
}
for _, option := range options {
t.Run(option.name, func(t *testing.T) {
te := testBatchExporter{}
te := testBatchExporter{
delay: option.delayExportBy,
}
tp := basicTracerProvider(t)
ssp := createAndRegisterBatchSP(option, &te)
if ssp == nil {
Expand All @@ -185,17 +210,22 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
tp.UnregisterSpanProcessor(ssp)

gotNumOfSpans := te.len()
if option.wantNumSpans != gotNumOfSpans {
if option.wantNumSpans > 0 && option.wantNumSpans != gotNumOfSpans {
t.Errorf("number of exported span: got %+v, want %+v\n",
gotNumOfSpans, option.wantNumSpans)
}

gotBatchCount := te.getBatchCount()
if gotBatchCount < option.wantBatchCount {
if option.wantBatchCount > 0 && gotBatchCount < option.wantBatchCount {
t.Errorf("number batches: got %+v, want >= %+v\n",
gotBatchCount, option.wantBatchCount)
t.Errorf("Batches %v\n", te.sizes)
}

if option.wantExportTimeout && te.err != context.DeadlineExceeded {
t.Errorf("context deadline: got err %+v, want %+v\n",
te.err, context.DeadlineExceeded)
}
})
}
}
Expand Down

0 comments on commit 3c7face

Please sign in to comment.