From 3c2935f0e3d1398c4e32a3b4a0398eff961e2ad3 Mon Sep 17 00:00:00 2001 From: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com> Date: Mon, 16 Oct 2023 21:52:05 +0530 Subject: [PATCH] [chore]: Expand e2e testbed (#27251) Related issue: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/20552 Tweak the mock-backend to do following: - Receives data from the receiver. - Returns errors randomly to our receiver, which attempts to resend/drop the data. This is helpful when we're required to test random behaviors of the collector and ensure reliable data delivery. This is my initial PR to expand the testbed. This will help my further efforts to expand the testbed. Myself and @omrozowicz-splunk plan on adding `sending_queue` support to the testbed and expanding the testing capabilities. --------- Co-authored-by: Daniel Jaglowski --- .../traces/correctness_test.go | 60 ++++++++++++ testbed/go.mod | 2 +- testbed/testbed/load_generator.go | 93 +++++++++++++++---- testbed/testbed/mock_backend.go | 63 +++++++++++++ testbed/testbed/options.go | 7 ++ testbed/testbed/receivers.go | 17 +++- testbed/testbed/test_case.go | 5 + testbed/testbed/validator.go | 4 +- 8 files changed, 226 insertions(+), 25 deletions(-) diff --git a/testbed/correctnesstests/traces/correctness_test.go b/testbed/correctnesstests/traces/correctness_test.go index 713042848e11..5a1aab46b516 100644 --- a/testbed/correctnesstests/traces/correctness_test.go +++ b/testbed/correctnesstests/traces/correctness_test.go @@ -92,3 +92,63 @@ func testWithTracingGoldenDataset( tc.ValidateData() } + +func TestSporadicGoldenDataset(t *testing.T) { + testCases := []struct { + decisionFunc func() error + }{ + { + decisionFunc: testbed.RandomNonPermanentError, + }, + { + decisionFunc: testbed.RandomPermanentError, + }, + } + for _, tt := range testCases { + factories, err := testbed.Components() + require.NoError(t, err, "default components resulted in: %v", err) + runner := testbed.NewInProcessCollector(factories) + options := testbed.LoadOptions{DataItemsPerSecond: 10000, ItemsPerBatch: 10} + dataProvider := testbed.NewGoldenDataProvider( + "../../../internal/coreinternal/goldendataset/testdata/generated_pict_pairs_traces.txt", + "../../../internal/coreinternal/goldendataset/testdata/generated_pict_pairs_spans.txt", + "") + sender := testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)) + receiver := testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)) + receiver.WithRetry(` + retry_on_failure: + enabled: false +`) + receiver.WithQueue(` + sending_queue: + enabled: false +`) + _, err = runner.PrepareConfig(correctnesstests.CreateConfigYaml(sender, receiver, nil, "traces")) + require.NoError(t, err, "collector configuration resulted in: %v", err) + validator := testbed.NewCorrectTestValidator(sender.ProtocolName(), receiver.ProtocolName(), dataProvider) + tc := testbed.NewTestCase( + t, + dataProvider, + sender, + receiver, + runner, + validator, + correctnessResults, + testbed.WithSkipResults(), + testbed.WithDecisionFunc(tt.decisionFunc), + ) + defer tc.Stop() + tc.StartBackend() + tc.StartAgent() + tc.StartLoad(options) + tc.Sleep(3 * time.Second) + + tc.StopLoad() + + tc.WaitForN(func() bool { + return tc.LoadGenerator.DataItemsSent()-tc.LoadGenerator.PermanentErrors() == tc.MockBackend.DataItemsReceived() + }, 5*time.Second, "all data items received") + tc.StopAgent() + tc.ValidateData() + } +} diff --git a/testbed/go.mod b/testbed/go.mod index 946145131893..7a2cac8a9510 100644 --- a/testbed/go.mod +++ b/testbed/go.mod @@ -55,6 +55,7 @@ require ( go.uber.org/multierr v1.11.0 go.uber.org/zap v1.26.0 golang.org/x/text v0.13.0 + google.golang.org/grpc v1.58.3 ) require ( @@ -259,7 +260,6 @@ require ( google.golang.org/genproto v0.0.0-20230913181813-007df8e322eb // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230913181813-007df8e322eb // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13 // indirect - google.golang.org/grpc v1.58.3 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/testbed/testbed/load_generator.go b/testbed/testbed/load_generator.go index 7172d82045b2..2bf3dcb6b78f 100644 --- a/testbed/testbed/load_generator.go +++ b/testbed/testbed/load_generator.go @@ -11,6 +11,7 @@ import ( "sync/atomic" "time" + "go.opentelemetry.io/collector/consumer/consumererror" "golang.org/x/text/message" ) @@ -25,6 +26,10 @@ type LoadGenerator struct { // Number of data items (spans or metric data points) sent. dataItemsSent atomic.Uint64 + // Number of permanent errors received + permanentErrors atomic.Uint64 + nonPermanentErrors atomic.Uint64 + stopOnce sync.Once stopWait sync.WaitGroup stopSignal chan struct{} @@ -109,6 +114,14 @@ func (lg *LoadGenerator) DataItemsSent() uint64 { return lg.dataItemsSent.Load() } +func (lg *LoadGenerator) PermanentErrors() uint64 { + return lg.permanentErrors.Load() +} + +func (lg *LoadGenerator) NonPermanentErrors() uint64 { + return lg.nonPermanentErrors.Load() +} + // IncDataItemsSent is used when a test bypasses the LoadGenerator and sends data // directly via TestCases's Sender. This is necessary so that the total number of sent // items in the end is correct, because the reports are printed from LoadGenerator's @@ -184,12 +197,26 @@ func (lg *LoadGenerator) generateTrace() { return } - err := traceSender.ConsumeTraces(context.Background(), traceData) - if err == nil { - lg.prevErr = nil - } else if lg.prevErr == nil || lg.prevErr.Error() != err.Error() { - lg.prevErr = err - log.Printf("Cannot send traces: %v", err) + for { + err := traceSender.ConsumeTraces(context.Background(), traceData) + if err == nil { + lg.prevErr = nil + break + } + + if !consumererror.IsPermanent(err) { + lg.nonPermanentErrors.Add(uint64(traceData.SpanCount())) + continue + } + + lg.permanentErrors.Add(uint64(traceData.SpanCount())) + + // update prevErr to err if it's different than last observed error + if lg.prevErr == nil || lg.prevErr.Error() != err.Error() { + lg.prevErr = err + log.Printf("Cannot send traces: %v", err) + } + break } } @@ -200,13 +227,26 @@ func (lg *LoadGenerator) generateMetrics() { if done { return } - - err := metricSender.ConsumeMetrics(context.Background(), metricData) - if err == nil { - lg.prevErr = nil - } else if lg.prevErr == nil || lg.prevErr.Error() != err.Error() { - lg.prevErr = err - log.Printf("Cannot send metrics: %v", err) + for { + err := metricSender.ConsumeMetrics(context.Background(), metricData) + if err == nil { + lg.prevErr = nil + break + } + + if !consumererror.IsPermanent(err) { + lg.nonPermanentErrors.Add(uint64(metricData.DataPointCount())) + continue + } + + lg.permanentErrors.Add(uint64(metricData.DataPointCount())) + + // update prevErr to err if it's different than last observed error + if lg.prevErr == nil || lg.prevErr.Error() != err.Error() { + lg.prevErr = err + log.Printf("Cannot send metrics: %v", err) + } + break } } @@ -217,12 +257,25 @@ func (lg *LoadGenerator) generateLog() { if done { return } - - err := logSender.ConsumeLogs(context.Background(), logData) - if err == nil { - lg.prevErr = nil - } else if lg.prevErr == nil || lg.prevErr.Error() != err.Error() { - lg.prevErr = err - log.Printf("Cannot send logs: %v", err) + for { + err := logSender.ConsumeLogs(context.Background(), logData) + if err == nil { + lg.prevErr = nil + break + } + + if !consumererror.IsPermanent(err) { + lg.nonPermanentErrors.Add(uint64(logData.LogRecordCount())) + continue + } + + lg.permanentErrors.Add(uint64(logData.LogRecordCount())) + + // update prevErr to err if it's different than last observed error + if lg.prevErr == nil || lg.prevErr.Error() != err.Error() { + lg.prevErr = err + log.Printf("Cannot send logs: %v", err) + } + break } } diff --git a/testbed/testbed/mock_backend.go b/testbed/testbed/mock_backend.go index 751ac496ec38..e6e932985633 100644 --- a/testbed/testbed/mock_backend.go +++ b/testbed/testbed/mock_backend.go @@ -5,18 +5,28 @@ package testbed // import "github.com/open-telemetry/opentelemetry-collector-con import ( "context" + "errors" "log" + "math/rand" "os" "sync" "sync/atomic" "time" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) +var errNonPermanent = errors.New("non permanent error") +var errPermanent = errors.New("permanent error") + +type decisionFunc func() error + // MockBackend is a backend that allows receiving the data locally. type MockBackend struct { // Metric and trace consumers @@ -41,6 +51,13 @@ type MockBackend struct { ReceivedTraces []ptrace.Traces ReceivedMetrics []pmetric.Metrics ReceivedLogs []plog.Logs + + DroppedTraces []ptrace.Traces + DroppedMetrics []pmetric.Metrics + DroppedLogs []plog.Logs + + // decision to return permanent/non-permanent errors + decision decisionFunc } // NewMockBackend creates a new mock backend that receives data using specified receiver. @@ -51,6 +68,7 @@ func NewMockBackend(logFilePath string, receiver DataReceiver) *MockBackend { tc: &MockTraceConsumer{}, mc: &MockMetricConsumer{}, lc: &MockLogConsumer{}, + decision: func() error { return nil }, } mb.tc.backend = mb mb.mc.backend = mb @@ -58,6 +76,10 @@ func NewMockBackend(logFilePath string, receiver DataReceiver) *MockBackend { return mb } +func (mb *MockBackend) WithDecisionFunc(decision decisionFunc) { + mb.decision = decision +} + // Start a backend. func (mb *MockBackend) Start() error { log.Printf("Starting mock backend...") @@ -161,6 +183,13 @@ func (tc *MockTraceConsumer) Capabilities() consumer.Capabilities { } func (tc *MockTraceConsumer) ConsumeTraces(_ context.Context, td ptrace.Traces) error { + if err := tc.backend.decision(); err != nil { + if consumererror.IsPermanent(err) && tc.backend.isRecording { + tc.backend.DroppedTraces = append(tc.backend.DroppedTraces, td) + } + return err + } + tc.numSpansReceived.Add(uint64(td.SpanCount())) rs := td.ResourceSpans() @@ -208,6 +237,13 @@ func (mc *MockMetricConsumer) Capabilities() consumer.Capabilities { } func (mc *MockMetricConsumer) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error { + if err := mc.backend.decision(); err != nil { + if consumererror.IsPermanent(err) && mc.backend.isRecording { + mc.backend.DroppedMetrics = append(mc.backend.DroppedMetrics, md) + } + return err + } + mc.numMetricsReceived.Add(uint64(md.DataPointCount())) mc.backend.ConsumeMetric(md) return nil @@ -233,8 +269,35 @@ func (lc *MockLogConsumer) Capabilities() consumer.Capabilities { } func (lc *MockLogConsumer) ConsumeLogs(_ context.Context, ld plog.Logs) error { + if err := lc.backend.decision(); err != nil { + if consumererror.IsPermanent(err) && lc.backend.isRecording { + lc.backend.DroppedLogs = append(lc.backend.DroppedLogs, ld) + } + return err + } + recordCount := ld.LogRecordCount() lc.numLogRecordsReceived.Add(uint64(recordCount)) lc.backend.ConsumeLogs(ld) return nil } + +// randomNonPermanentError is a decision function that succeeds approximately +// half of the time and fails with a non-permanent error the rest of the time. +func RandomNonPermanentError() error { + code := codes.Unavailable + s := status.New(code, errNonPermanent.Error()) + if rand.Float32() < 0.5 { + return s.Err() + } + return nil +} + +// randomPermanentError is a decision function that succeeds approximately +// half of the time and fails with a permanent error the rest of the time. +func RandomPermanentError() error { + if rand.Float32() < 0.5 { + return consumererror.NewPermanent(errPermanent) + } + return nil +} diff --git a/testbed/testbed/options.go b/testbed/testbed/options.go index 3b0e51d6f975..ac37313f466f 100644 --- a/testbed/testbed/options.go +++ b/testbed/testbed/options.go @@ -66,3 +66,10 @@ func WithResourceLimits(resourceSpec ResourceSpec) TestCaseOption { } } } + +// WithDecision enables our mock backend to behave sporadically +func WithDecisionFunc(decision decisionFunc) TestCaseOption { + return func(tc *TestCase) { + tc.decision = decision + } +} diff --git a/testbed/testbed/receivers.go b/testbed/testbed/receivers.go index 8c14efe4dffc..59937042e358 100644 --- a/testbed/testbed/receivers.go +++ b/testbed/testbed/receivers.go @@ -53,6 +53,8 @@ type BaseOTLPDataReceiver struct { metricsReceiver receiver.Metrics logReceiver receiver.Logs compression string + retry string + sendingQueue string } func (bor *BaseOTLPDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error { @@ -91,6 +93,16 @@ func (bor *BaseOTLPDataReceiver) WithCompression(compression string) *BaseOTLPDa return bor } +func (bor *BaseOTLPDataReceiver) WithRetry(retry string) *BaseOTLPDataReceiver { + bor.retry = retry + return bor +} + +func (bor *BaseOTLPDataReceiver) WithQueue(sendingQueue string) *BaseOTLPDataReceiver { + bor.sendingQueue = sendingQueue + return bor +} + func (bor *BaseOTLPDataReceiver) Stop() error { if err := bor.traceReceiver.Shutdown(context.Background()); err != nil { return err @@ -114,9 +126,10 @@ func (bor *BaseOTLPDataReceiver) GenConfigYAMLStr() string { str := fmt.Sprintf(` %s: endpoint: "%s" + %s + %s tls: - insecure: true`, bor.exporterType, addr) - + insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue) comp := "none" if bor.compression != "" { comp = bor.compression diff --git a/testbed/testbed/test_case.go b/testbed/testbed/test_case.go index 74dcb78dd9f3..c765f334ed9e 100644 --- a/testbed/testbed/test_case.go +++ b/testbed/testbed/test_case.go @@ -51,6 +51,9 @@ type TestCase struct { doneSignal chan struct{} errorCause string resultsSummary TestResultsSummary + + // decision makes mockbackend return permanent/non-permament errors at random basis + decision decisionFunc } const mibibyte = 1024 * 1024 @@ -77,6 +80,7 @@ func NewTestCase( agentProc: agentProc, validator: validator, resultsSummary: resultsSummary, + decision: func() error { return nil }, } // Get requested test case duration from env variable. @@ -111,6 +115,7 @@ func NewTestCase( require.NoError(t, err, "Cannot create generator") tc.MockBackend = NewMockBackend(tc.composeTestResultFileName("backend.log"), receiver) + tc.MockBackend.WithDecisionFunc(tc.decision) go tc.logStats() diff --git a/testbed/testbed/validator.go b/testbed/testbed/validator.go index 8ce6326d52ff..b22d10200e2d 100644 --- a/testbed/testbed/validator.go +++ b/testbed/testbed/validator.go @@ -80,13 +80,13 @@ func NewCorrectTestValidator(senderName string, receiverName string, provider Da func (v *CorrectnessTestValidator) Validate(tc *TestCase) { if assert.EqualValues(tc.t, - int64(tc.LoadGenerator.DataItemsSent()), + int64(tc.LoadGenerator.DataItemsSent())-int64(tc.LoadGenerator.PermanentErrors()), int64(tc.MockBackend.DataItemsReceived()), "Received and sent counters do not match.") { log.Printf("Sent and received data counters match.") } if len(tc.MockBackend.ReceivedTraces) > 0 { - v.assertSentRecdTracingDataEqual(tc.MockBackend.ReceivedTraces) + v.assertSentRecdTracingDataEqual(append(tc.MockBackend.ReceivedTraces, tc.MockBackend.DroppedTraces...)) } assert.EqualValues(tc.t, 0, len(v.assertionFailures), "There are span data mismatches.") }