Skip to content

Commit

Permalink
Add timeoutExporter (open-telemetry#5118)
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Apr 1, 2024
1 parent 7667f7b commit bddfbc6
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 0 deletions.
27 changes: 27 additions & 0 deletions sdk/log/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"context"
"time"

"go.opentelemetry.io/otel"
)
Expand Down Expand Up @@ -53,6 +54,32 @@ func (noopExporter) Shutdown(context.Context) error { return nil }

func (noopExporter) ForceFlush(context.Context) error { return nil }

// timeoutExporter wraps an Exporter and ensures any call to Export will have a
// timeout for the context.
type timeoutExporter struct {
Exporter

// timeout is the maximum time an export is attempted.
timeout time.Duration
}

// newTimeoutExporter wraps exporter with an Exporter that limits the context
// lifetime passed to Export to be timeout. If timeout is less than or equal to
// zero, exporter will be returned directly.
func newTimeoutExporter(exp Exporter, timeout time.Duration) Exporter {
if timeout <= 0 {
return exp
}
return &timeoutExporter{Exporter: exp, timeout: timeout}
}

// Export sets the timeout of ctx before calling the Exporter e wraps.
func (e *timeoutExporter) Export(ctx context.Context, records []Record) error {
ctx, cancel := context.WithTimeout(ctx, e.timeout)
defer cancel()
return e.Exporter.Export(ctx, records)
}

// exportSync exports all data from input using exporter in a spawned
// goroutine. The returned chan will be closed when the spawned goroutine
// completes.
Expand Down
47 changes: 47 additions & 0 deletions sdk/log/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type instruction struct {
type testExporter struct {
// Err is the error returned by all methods of the testExporter.
Err error
// ExportTrigger is read from prior to returning from the Export method if
// non-nil.
ExportTrigger chan struct{}

// Counts of method calls.
exportN, shutdownN, forceFlushN *int32
Expand Down Expand Up @@ -74,6 +77,13 @@ func (e *testExporter) Records() [][]Record {

func (e *testExporter) Export(ctx context.Context, r []Record) error {
atomic.AddInt32(e.exportN, 1)
if e.ExportTrigger != nil {
select {
case <-e.ExportTrigger:
case <-ctx.Done():
return ctx.Err()
}
}
e.input <- instruction{Record: &r}
return e.Err
}
Expand Down Expand Up @@ -196,3 +206,40 @@ func TestExportSync(t *testing.T) {
assert.ElementsMatch(t, want, got, "record bodies")
})
}

func TestTimeoutExporter(t *testing.T) {
t.Run("ZeroTimeout", func(t *testing.T) {
exp := newTestExporter(nil)
t.Cleanup(exp.Stop)
e := newTimeoutExporter(exp, 0)
assert.Same(t, exp, e)
})

t.Run("Timeout", func(t *testing.T) {
trigger := make(chan struct{})
t.Cleanup(func() { close(trigger) })

exp := newTestExporter(nil)
t.Cleanup(exp.Stop)
exp.ExportTrigger = trigger
e := newTimeoutExporter(exp, time.Nanosecond)

out := make(chan error, 1)
go func() {
out <- e.Export(context.Background(), make([]Record, 1))
}()

var err error
assert.Eventually(t, func() bool {
select {
case err = <-out:
return true
default:
return false
}
}, 2*time.Second, time.Microsecond)

assert.ErrorIs(t, err, context.DeadlineExceeded)
close(out)
})
}

0 comments on commit bddfbc6

Please sign in to comment.