diff --git a/CHANGELOG.md b/CHANGELOG.md index a059c08cd1f8..7f4a44b60aa0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added +- Option `ExportTimeout` was added to batch span processor. (#1755) - Adds semantic conventions for exceptions. (#1492) - Added support for configuring OTLP/HTTP Endpoints, Headers, Compression and Timeout via the Environment Variables. (#1758) - `OTEL_EXPORTER_OTLP_ENDPOINT` diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index 25d556e5f359..a2494135f4d3 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -28,6 +28,7 @@ import ( const ( DefaultMaxQueueSize = 2048 DefaultBatchTimeout = 5000 * time.Millisecond + DefaultExportTimeout = 30000 * time.Millisecond DefaultMaxExportBatchSize = 512 ) @@ -44,6 +45,11 @@ type BatchSpanProcessorOptions struct { // The default value of BatchTimeout is 5000 msec. BatchTimeout time.Duration + // ExportTimeout specifies the maximum duration for exporting spans. If the timeout + // is reached, the export will be cancelled. + // The default value of ExportTimeout is 30000 msec. + ExportTimeout time.Duration + // MaxExportBatchSize is the maximum number of spans to process in a single batch. // If there are more than one batch worth of spans then it processes multiple batches // of spans one batch after the other without any delay. @@ -83,6 +89,7 @@ var _ SpanProcessor = (*batchSpanProcessor)(nil) func NewBatchSpanProcessor(exporter export.SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor { o := BatchSpanProcessorOptions{ BatchTimeout: DefaultBatchTimeout, + ExportTimeout: DefaultExportTimeout, MaxQueueSize: DefaultMaxQueueSize, MaxExportBatchSize: DefaultMaxExportBatchSize, } @@ -185,6 +192,12 @@ func WithBatchTimeout(delay time.Duration) BatchSpanProcessorOption { } } +func WithExportTimeout(timeout time.Duration) BatchSpanProcessorOption { + return func(o *BatchSpanProcessorOptions) { + o.ExportTimeout = timeout + } +} + func WithBlocking() BatchSpanProcessorOption { return func(o *BatchSpanProcessorOptions) { o.BlockOnQueueFull = true @@ -198,6 +211,12 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error { bsp.batchMutex.Lock() defer bsp.batchMutex.Unlock() + if bsp.o.ExportTimeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, bsp.o.ExportTimeout) + defer cancel() + } + if len(bsp.batch) > 0 { if err := bsp.e.ExportSpans(ctx, bsp.batch); err != nil { return err diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index f2bf428e22a2..44c3c2f69f26 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -36,12 +36,23 @@ type testBatchExporter struct { sizes []int batchCount int shutdownCount int + delay time.Duration + err error } func (t *testBatchExporter) ExportSpans(ctx context.Context, ss []*export.SpanSnapshot) error { t.mu.Lock() defer t.mu.Unlock() + time.Sleep(t.delay) + + select { + case <-ctx.Done(): + t.err = ctx.Err() + return ctx.Err() + default: + } + t.spans = append(t.spans, ss...) t.sizes = append(t.sizes, len(ss)) t.batchCount++ @@ -88,16 +99,19 @@ func TestNewBatchSpanProcessorWithNilExporter(t *testing.T) { } type testOption struct { - name string - o []sdktrace.BatchSpanProcessorOption - wantNumSpans int - wantBatchCount int - genNumSpans int - parallel bool + name string + o []sdktrace.BatchSpanProcessorOption + wantNumSpans int + wantBatchCount int + wantExportTimeout bool + genNumSpans int + delayExportBy time.Duration + parallel bool } func TestNewBatchSpanProcessorWithOptions(t *testing.T) { schDelay := 200 * time.Millisecond + exportTimeout := time.Millisecond options := []testOption{ { name: "default BatchSpanProcessorOptions", @@ -105,6 +119,15 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) { wantBatchCount: 4, genNumSpans: 2053, }, + { + name: "non-default ExportTimeout", + o: []sdktrace.BatchSpanProcessorOption{ + sdktrace.WithExportTimeout(exportTimeout), + }, + wantExportTimeout: true, + genNumSpans: 2053, + delayExportBy: 2 * exportTimeout, // to ensure export timeout + }, { name: "non-default BatchTimeout", o: []sdktrace.BatchSpanProcessorOption{ @@ -171,7 +194,9 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) { } for _, option := range options { t.Run(option.name, func(t *testing.T) { - te := testBatchExporter{} + te := testBatchExporter{ + delay: option.delayExportBy, + } tp := basicTracerProvider(t) ssp := createAndRegisterBatchSP(option, &te) if ssp == nil { @@ -185,17 +210,22 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) { tp.UnregisterSpanProcessor(ssp) gotNumOfSpans := te.len() - if option.wantNumSpans != gotNumOfSpans { + if option.wantNumSpans > 0 && option.wantNumSpans != gotNumOfSpans { t.Errorf("number of exported span: got %+v, want %+v\n", gotNumOfSpans, option.wantNumSpans) } gotBatchCount := te.getBatchCount() - if gotBatchCount < option.wantBatchCount { + if option.wantBatchCount > 0 && gotBatchCount < option.wantBatchCount { t.Errorf("number batches: got %+v, want >= %+v\n", gotBatchCount, option.wantBatchCount) t.Errorf("Batches %v\n", te.sizes) } + + if option.wantExportTimeout && te.err != context.DeadlineExceeded { + t.Errorf("context deadline: got err %+v, want %+v\n", + te.err, context.DeadlineExceeded) + } }) } }