Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[internal/otelarrow] Fix test flake (for 34719) #34889

Merged
merged 19 commits into from
Sep 6, 2024
Merged
57 changes: 37 additions & 20 deletions internal/otelarrow/test/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest"
"go.uber.org/zap/zaptest/observer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -83,14 +82,18 @@ func (tc *testConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) err
return tc.sink.ConsumeTraces(ctx, td)
}

func testLoggerSettings(t *testing.T) (component.TelemetrySettings, *observer.ObservedLogs, *tracetest.InMemoryExporter) {
func testLoggerSettings(_ *testing.T) (component.TelemetrySettings, *observer.ObservedLogs, *tracetest.InMemoryExporter) {
tset := componenttest.NewNopTelemetrySettings()

core, obslogs := observer.New(zapcore.InfoLevel)

exp := tracetest.NewInMemoryExporter()

tset.Logger = zap.New(zapcore.NewTee(core, zaptest.NewLogger(t).Core()))
// Note: To debug any of the logs-based assertions in this test, uncomment
// the following line:
//
// tset.Logger = zap.New(zapcore.NewTee(core, zaptest.NewLogger(t).Core()))
tset.Logger = zap.New(core)
tset.TracerProvider = trace.NewTracerProvider(trace.WithSyncer(exp))

return tset, obslogs, exp
Expand Down Expand Up @@ -329,6 +332,9 @@ func logSigs(obs *observer.ObservedLogs) (map[string]int, []string) {
for _, f := range rl.Context {
attrs = append(attrs, f.Key)

// One way we can see memory limit errors is through the
// OTel-Arrow common "arrow stream error" message, which both
// sides will log.
if rl.Message == "arrow stream error" && f.Key == "message" {
msgs = append(msgs, f.String)
}
Expand All @@ -346,7 +352,11 @@ var limitRegexp = regexp.MustCompile(`memory limit exceeded`)

func countMemoryLimitErrors(msgs []string) (cnt int) {
for _, msg := range msgs {
if limitRegexp.MatchString(msg) {
// The memory errors are expected from the receiver,
// so whether these print on the exporter or receiver,
// the message will contain "otel-arrow decode" from
// the receiver.
if limitRegexp.MatchString(msg) && strings.Contains(msg, "otel-arrow decode") {
cnt++
}
}
Expand All @@ -357,12 +367,12 @@ func failureMemoryLimitEnding(t *testing.T, _ testParams, testCon *testConsumer,
eSigs, eMsgs := logSigs(testCon.expLogs)
rSigs, rMsgs := logSigs(testCon.recvLogs)

// Test for arrow stream errors.
require.Less(t, 0, eSigs["arrow stream error|||code///message///where"], "should have exporter arrow stream errors: %v", eSigs)
// Test for arrow receiver stream errors on both sides.
require.Less(t, 0, eSigs["arrow stream error|||code///message///where"], "should have exporter arrow stream errors: %v", eMsgs)
require.Less(t, 0, rSigs["arrow stream error|||code///message///where"], "should have receiver arrow stream errors: %v", rSigs)

// Ensure the errors include memory limit errors.

// Ensure both side's error logs include memory limit errors
// one way or another.
require.Less(t, 0, countMemoryLimitErrors(rMsgs), "should have memory limit errors: %v", rMsgs)
require.Less(t, 0, countMemoryLimitErrors(eMsgs), "should have memory limit errors: %v", eMsgs)

Expand All @@ -374,7 +384,9 @@ func consumerSuccess(t *testing.T, err error) {
}

func consumerFailure(t *testing.T, err error) {
require.Error(t, err)
if err == nil {
return
}

// there should be no permanent errors anywhere in this test.
require.False(t, consumererror.IsPermanent(err),
Expand Down Expand Up @@ -414,32 +426,37 @@ func TestIntegrationTracesSimple(t *testing.T) {
}

func TestIntegrationMemoryLimited(t *testing.T) {
// This test is flaky, it only shows on Windows. This will be
// addressed in a separate PR.
t.Skip("test flake disabled")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// until 10 threads can write 100 spans
// until exporter and receiver finish at least one ArrowTraces span.
params := testParams{
threadCount: 10,
requestUntil: func(test *testConsumer) bool {
cnt := 0
for _, span := range test.expSpans.GetSpans() {
if span.Name == "opentelemetry.proto.experimental.arrow.v1.ArrowTracesService/ArrowTraces" {
cnt++
cf := func(spans tracetest.SpanStubs) (cnt int) {
for _, span := range spans {
if span.Name == "opentelemetry.proto.experimental.arrow.v1.ArrowTracesService/ArrowTraces" {
cnt++
}
}
return
}
return cnt == 0 || test.sentSpans.Load() < 100

rcnt := cf(test.recvSpans.GetSpans())
ecnt := cf(test.expSpans.GetSpans())
return ecnt == 0 || rcnt == 0
},
}

testIntegrationTraces(ctx, t, params, func(ecfg *ExpConfig, rcfg *RecvConfig) {
rcfg.Arrow.MemoryLimitMiB = 1
ecfg.Arrow.NumStreams = 10
// Shorten timeouts for this test, because we intend
// for it to fail and don't want to wait for retries.
ecfg.TimeoutSettings.Timeout = 5 * time.Second
ecfg.RetryConfig.InitialInterval = 1 * time.Second
ecfg.RetryConfig.MaxInterval = 2 * time.Second
ecfg.RetryConfig.MaxElapsedTime = 30 * time.Second
ecfg.Arrow.MaxStreamLifetime = 5 * time.Second
}, bulkyGenFunc(), consumerFailure, failureMemoryLimitEnding)
}

Expand Down