Skip to content

Commit

Permalink
Adds support for timeout on the otlp/gRPC exporter (#1821)
Browse files Browse the repository at this point in the history
* initial support for timeout on otlp grpc exporter

* fix tests

* run make

* update changelog

* update changelog

* apply suggestions

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
  • Loading branch information
paivagustavo and MrAlias authored Apr 20, 2021
1 parent 081cc61 commit 70bc9eb
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The `Event` and `Link` struct types from the `go.opentelemetry.io/otel` package now include a `DroppedAttributeCount` field to record the number of attributes that were not recorded due to configured limits being reached. (#1771)
- The Jaeger exporter now reports dropped attributes for a Span event in the exported log. (#1771)
- Adds `k8s.node.name` and `k8s.node.uid` attribute keys to the `semconv` package. (#1789)
- Adds `otlpgrpc.WithTimeout` option for configuring timeout to the otlp/gRPC exporter. (#1821)

### Fixed

Expand Down
4 changes: 4 additions & 0 deletions exporters/otlp/otlpgrpc/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func (d *driver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet,
}
ctx, cancel := d.metricsDriver.connection.contextWithStop(ctx)
defer cancel()
ctx, tCancel := context.WithTimeout(ctx, d.metricsDriver.connection.sCfg.Timeout)
defer tCancel()

rms, err := transform.CheckpointSet(ctx, selector, cps, 1)
if err != nil {
Expand Down Expand Up @@ -162,6 +164,8 @@ func (d *driver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot)
}
ctx, cancel := d.tracesDriver.connection.contextWithStop(ctx)
defer cancel()
ctx, tCancel := context.WithTimeout(ctx, d.tracesDriver.connection.sCfg.Timeout)
defer tCancel()

protoSpans := transform.SpanData(ss)
if len(protoSpans) == 0 {
Expand Down
8 changes: 8 additions & 0 deletions exporters/otlp/otlpgrpc/mock_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type mockTraceService struct {
mu sync.RWMutex
storage otlptest.SpansStorage
headers metadata.MD
delay time.Duration
}

func (mts *mockTraceService) getHeaders() metadata.MD {
Expand All @@ -73,6 +74,9 @@ func (mts *mockTraceService) getResourceSpans() []*tracepb.ResourceSpans {
}

func (mts *mockTraceService) Export(ctx context.Context, exp *collectortracepb.ExportTraceServiceRequest) (*collectortracepb.ExportTraceServiceResponse, error) {
if mts.delay > 0 {
time.Sleep(mts.delay)
}
reply := &collectortracepb.ExportTraceServiceResponse{}
mts.mu.Lock()
defer mts.mu.Unlock()
Expand All @@ -86,6 +90,7 @@ type mockMetricService struct {

mu sync.RWMutex
storage otlptest.MetricsStorage
delay time.Duration
}

func (mms *mockMetricService) getMetrics() []*metricpb.Metric {
Expand All @@ -95,6 +100,9 @@ func (mms *mockMetricService) getMetrics() []*metricpb.Metric {
}

func (mms *mockMetricService) Export(ctx context.Context, exp *collectormetricpb.ExportMetricsServiceRequest) (*collectormetricpb.ExportMetricsServiceResponse, error) {
if mms.delay > 0 {
time.Sleep(mms.delay)
}
reply := &collectormetricpb.ExportMetricsServiceResponse{}
mms.mu.Lock()
defer mms.mu.Unlock()
Expand Down
18 changes: 18 additions & 0 deletions exporters/otlp/otlpgrpc/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,21 @@ func WithDialOption(opts ...grpc.DialOption) Option {
cfg.DialOptions = opts
})
}

// WithTimeout tells the driver the max waiting time for the backend to process
// each spans or metrics batch. If unset, the default will be 10 seconds.
func WithTimeout(duration time.Duration) Option {
return otlpconfig.WithTimeout(duration)
}

// WithTracesTimeout tells the driver the max waiting time for the backend to process
// each spans batch. If unset, the default will be 10 seconds.
func WithTracesTimeout(duration time.Duration) Option {
return otlpconfig.WithTracesTimeout(duration)
}

// WithMetricsTimeout tells the driver the max waiting time for the backend to process
// each metrics batch. If unset, the default will be 10 seconds.
func WithMetricsTimeout(duration time.Duration) Option {
return otlpconfig.WithMetricsTimeout(duration)
}
87 changes: 87 additions & 0 deletions exporters/otlp/otlpgrpc/otlp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
"testing"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -302,6 +305,90 @@ func TestNewExporter_withHeaders(t *testing.T) {
assert.Equal(t, "value1", headers.Get("header1")[0])
}

func TestNewExporter_WithTimeout(t *testing.T) {
tts := []struct {
name string
fn func(exp *otlp.Exporter) error
timeout time.Duration
metrics int
spans int
code codes.Code
delay bool
}{
{
name: "Timeout Spans",
fn: func(exp *otlp.Exporter) error {
return exp.ExportSpans(context.Background(), []*sdktrace.SpanSnapshot{{Name: "timed out"}})
},
timeout: time.Millisecond * 100,
code: codes.DeadlineExceeded,
delay: true,
},
{
name: "Timeout Metrics",
fn: func(exp *otlp.Exporter) error {
return exp.Export(context.Background(), otlptest.OneRecordCheckpointSet{})
},
timeout: time.Millisecond * 100,
code: codes.DeadlineExceeded,
delay: true,
},

{
name: "No Timeout Spans",
fn: func(exp *otlp.Exporter) error {
return exp.ExportSpans(context.Background(), []*sdktrace.SpanSnapshot{{Name: "timed out"}})
},
timeout: time.Minute,
spans: 1,
code: codes.OK,
},
{
name: "No Timeout Metrics",
fn: func(exp *otlp.Exporter) error {
return exp.Export(context.Background(), otlptest.OneRecordCheckpointSet{})
},
timeout: time.Minute,
metrics: 1,
code: codes.OK,
},
}

for _, tt := range tts {
t.Run(tt.name, func(t *testing.T) {

mc := runMockCollector(t)
if tt.delay {
mc.traceSvc.delay = time.Second * 10
mc.metricSvc.delay = time.Second * 10
}
defer func() {
_ = mc.stop()
}()

ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint, otlpgrpc.WithTimeout(tt.timeout))
defer func() {
_ = exp.Shutdown(ctx)
}()

err := tt.fn(exp)

if tt.code == codes.OK {
require.NoError(t, err)
} else {
require.Error(t, err)
}

s := status.Convert(err)
require.Equal(t, tt.code, s.Code())

require.Len(t, mc.getSpans(), tt.spans)
require.Len(t, mc.getMetrics(), tt.metrics)
})
}
}

func TestNewExporter_withInvalidSecurityConfiguration(t *testing.T) {
mc := runMockCollector(t)
defer func() {
Expand Down

0 comments on commit 70bc9eb

Please sign in to comment.