From bddfbc68caa3101a11e02aa4de6f49fc31880136 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 1 Apr 2024 09:17:07 -0700 Subject: [PATCH] Add timeoutExporter (#5118) --- sdk/log/exporter.go | 27 +++++++++++++++++++++++ sdk/log/exporter_test.go | 47 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/sdk/log/exporter.go b/sdk/log/exporter.go index dff4dc9c28d..9f85f8a1fd9 100644 --- a/sdk/log/exporter.go +++ b/sdk/log/exporter.go @@ -5,6 +5,7 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( "context" + "time" "go.opentelemetry.io/otel" ) @@ -53,6 +54,32 @@ func (noopExporter) Shutdown(context.Context) error { return nil } func (noopExporter) ForceFlush(context.Context) error { return nil } +// timeoutExporter wraps an Exporter and ensures any call to Export will have a +// timeout for the context. +type timeoutExporter struct { + Exporter + + // timeout is the maximum time an export is attempted. + timeout time.Duration +} + +// newTimeoutExporter wraps exporter with an Exporter that limits the context +// lifetime passed to Export to be timeout. If timeout is less than or equal to +// zero, exporter will be returned directly. +func newTimeoutExporter(exp Exporter, timeout time.Duration) Exporter { + if timeout <= 0 { + return exp + } + return &timeoutExporter{Exporter: exp, timeout: timeout} +} + +// Export sets the timeout of ctx before calling the Exporter e wraps. +func (e *timeoutExporter) Export(ctx context.Context, records []Record) error { + ctx, cancel := context.WithTimeout(ctx, e.timeout) + defer cancel() + return e.Exporter.Export(ctx, records) +} + // exportSync exports all data from input using exporter in a spawned // goroutine. The returned chan will be closed when the spawned goroutine // completes. diff --git a/sdk/log/exporter_test.go b/sdk/log/exporter_test.go index 4eb2056d1b2..3c37b83ad38 100644 --- a/sdk/log/exporter_test.go +++ b/sdk/log/exporter_test.go @@ -25,6 +25,9 @@ type instruction struct { type testExporter struct { // Err is the error returned by all methods of the testExporter. Err error + // ExportTrigger is read from prior to returning from the Export method if + // non-nil. + ExportTrigger chan struct{} // Counts of method calls. exportN, shutdownN, forceFlushN *int32 @@ -74,6 +77,13 @@ func (e *testExporter) Records() [][]Record { func (e *testExporter) Export(ctx context.Context, r []Record) error { atomic.AddInt32(e.exportN, 1) + if e.ExportTrigger != nil { + select { + case <-e.ExportTrigger: + case <-ctx.Done(): + return ctx.Err() + } + } e.input <- instruction{Record: &r} return e.Err } @@ -196,3 +206,40 @@ func TestExportSync(t *testing.T) { assert.ElementsMatch(t, want, got, "record bodies") }) } + +func TestTimeoutExporter(t *testing.T) { + t.Run("ZeroTimeout", func(t *testing.T) { + exp := newTestExporter(nil) + t.Cleanup(exp.Stop) + e := newTimeoutExporter(exp, 0) + assert.Same(t, exp, e) + }) + + t.Run("Timeout", func(t *testing.T) { + trigger := make(chan struct{}) + t.Cleanup(func() { close(trigger) }) + + exp := newTestExporter(nil) + t.Cleanup(exp.Stop) + exp.ExportTrigger = trigger + e := newTimeoutExporter(exp, time.Nanosecond) + + out := make(chan error, 1) + go func() { + out <- e.Export(context.Background(), make([]Record, 1)) + }() + + var err error + assert.Eventually(t, func() bool { + select { + case err = <-out: + return true + default: + return false + } + }, 2*time.Second, time.Microsecond) + + assert.ErrorIs(t, err, context.DeadlineExceeded) + close(out) + }) +}