Skip to content

Commit

Permalink
[internal/otelarrow] Fix test flake (for 34719) (#34889)
Browse files Browse the repository at this point in the history
**Description:** Restore a skipped test, after understanding the nature
of the problem.

The problem was mostly addressed in
#34794,
which left the test disabled. The test had been flaky because while
testing for an out-of-memory condition, the test could fail for timeout
or other reason. To make the test more reliable, this now waits until at
least one ArrowTraces span has been received by both components. After
one span is available, it checks that the expected log messages are
present on both sides.

**Link to tracking Issue:** 
Fixes #34719.

**Testing:** ✅

---------

Co-authored-by: Curtis Robert <crobert@splunk.com>
Co-authored-by: Alex Boten <223565+codeboten@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 6, 2024
1 parent f970421 commit af21ce7
Showing 1 changed file with 37 additions and 20 deletions.
57 changes: 37 additions & 20 deletions internal/otelarrow/test/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest"
"go.uber.org/zap/zaptest/observer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -83,14 +82,18 @@ func (tc *testConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) err
return tc.sink.ConsumeTraces(ctx, td)
}

func testLoggerSettings(t *testing.T) (component.TelemetrySettings, *observer.ObservedLogs, *tracetest.InMemoryExporter) {
func testLoggerSettings(_ *testing.T) (component.TelemetrySettings, *observer.ObservedLogs, *tracetest.InMemoryExporter) {
tset := componenttest.NewNopTelemetrySettings()

core, obslogs := observer.New(zapcore.InfoLevel)

exp := tracetest.NewInMemoryExporter()

tset.Logger = zap.New(zapcore.NewTee(core, zaptest.NewLogger(t).Core()))
// Note: To debug any of the logs-based assertions in this test, uncomment
// the following line:
//
// tset.Logger = zap.New(zapcore.NewTee(core, zaptest.NewLogger(t).Core()))
tset.Logger = zap.New(core)
tset.TracerProvider = trace.NewTracerProvider(trace.WithSyncer(exp))

return tset, obslogs, exp
Expand Down Expand Up @@ -329,6 +332,9 @@ func logSigs(obs *observer.ObservedLogs) (map[string]int, []string) {
for _, f := range rl.Context {
attrs = append(attrs, f.Key)

// One way we can see memory limit errors is through the
// OTel-Arrow common "arrow stream error" message, which both
// sides will log.
if rl.Message == "arrow stream error" && f.Key == "message" {
msgs = append(msgs, f.String)
}
Expand All @@ -346,7 +352,11 @@ var limitRegexp = regexp.MustCompile(`memory limit exceeded`)

func countMemoryLimitErrors(msgs []string) (cnt int) {
for _, msg := range msgs {
if limitRegexp.MatchString(msg) {
// The memory errors are expected from the receiver,
// so whether these print on the exporter or receiver,
// the message will contain "otel-arrow decode" from
// the receiver.
if limitRegexp.MatchString(msg) && strings.Contains(msg, "otel-arrow decode") {
cnt++
}
}
Expand All @@ -357,12 +367,12 @@ func failureMemoryLimitEnding(t *testing.T, _ testParams, testCon *testConsumer,
eSigs, eMsgs := logSigs(testCon.expLogs)
rSigs, rMsgs := logSigs(testCon.recvLogs)

// Test for arrow stream errors.
require.Less(t, 0, eSigs["arrow stream error|||code///message///where"], "should have exporter arrow stream errors: %v", eSigs)
// Test for arrow receiver stream errors on both sides.
require.Less(t, 0, eSigs["arrow stream error|||code///message///where"], "should have exporter arrow stream errors: %v", eMsgs)
require.Less(t, 0, rSigs["arrow stream error|||code///message///where"], "should have receiver arrow stream errors: %v", rSigs)

// Ensure the errors include memory limit errors.

// Ensure both side's error logs include memory limit errors
// one way or another.
require.Less(t, 0, countMemoryLimitErrors(rMsgs), "should have memory limit errors: %v", rMsgs)
require.Less(t, 0, countMemoryLimitErrors(eMsgs), "should have memory limit errors: %v", eMsgs)

Expand All @@ -374,7 +384,9 @@ func consumerSuccess(t *testing.T, err error) {
}

func consumerFailure(t *testing.T, err error) {
require.Error(t, err)
if err == nil {
return
}

// there should be no permanent errors anywhere in this test.
require.False(t, consumererror.IsPermanent(err),
Expand Down Expand Up @@ -414,32 +426,37 @@ func TestIntegrationTracesSimple(t *testing.T) {
}

func TestIntegrationMemoryLimited(t *testing.T) {
// This test is flaky, it only shows on Windows. This will be
// addressed in a separate PR.
t.Skip("test flake disabled")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// until 10 threads can write 100 spans
// until exporter and receiver finish at least one ArrowTraces span.
params := testParams{
threadCount: 10,
requestUntil: func(test *testConsumer) bool {
cnt := 0
for _, span := range test.expSpans.GetSpans() {
if span.Name == "opentelemetry.proto.experimental.arrow.v1.ArrowTracesService/ArrowTraces" {
cnt++
cf := func(spans tracetest.SpanStubs) (cnt int) {
for _, span := range spans {
if span.Name == "opentelemetry.proto.experimental.arrow.v1.ArrowTracesService/ArrowTraces" {
cnt++
}
}
return
}
return cnt == 0 || test.sentSpans.Load() < 100

rcnt := cf(test.recvSpans.GetSpans())
ecnt := cf(test.expSpans.GetSpans())
return ecnt == 0 || rcnt == 0
},
}

testIntegrationTraces(ctx, t, params, func(ecfg *ExpConfig, rcfg *RecvConfig) {
rcfg.Arrow.MemoryLimitMiB = 1
ecfg.Arrow.NumStreams = 10
// Shorten timeouts for this test, because we intend
// for it to fail and don't want to wait for retries.
ecfg.TimeoutSettings.Timeout = 5 * time.Second
ecfg.RetryConfig.InitialInterval = 1 * time.Second
ecfg.RetryConfig.MaxInterval = 2 * time.Second
ecfg.RetryConfig.MaxElapsedTime = 30 * time.Second
ecfg.Arrow.MaxStreamLifetime = 5 * time.Second
}, bulkyGenFunc(), consumerFailure, failureMemoryLimitEnding)
}

Expand Down

0 comments on commit af21ce7

Please sign in to comment.