From ab79a8b573965b40ea6dd518f8609563c2affa24 Mon Sep 17 00:00:00 2001 From: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com> Date: Mon, 16 Oct 2023 16:43:27 +0530 Subject: [PATCH 1/2] [processor/resourcedetection] fix parsing error for windows (#27678) **Description:** The `system` detector extracts all the `cpu` info from the system even if you disable the configs and I believe this is where the bug kicks in. Disabling the settings will only stop it from setting the resource attributes. The [library](https://github.com/shirou/gopsutil/blob/v3.23.9/cpu/cpu_windows.go#L113) that we rely on doesn't extract some attributes for Windows OS (in this case, the field `cpu.Model`) and it leaves this field empty. This results in a bug when we try to parse an empty string. The long-term fix will be to extract `cpu.Model` in `gopsutil` upstream library. **Link to tracking Issue:** https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27675 --- ...rsing-error-windows-resourcedetection.yaml | 27 +++++++++++++++++++ .../internal/system/system.go | 13 ++++++--- 2 files changed, 36 insertions(+), 4 deletions(-) create mode 100644 .chloggen/fix-parsing-error-windows-resourcedetection.yaml diff --git a/.chloggen/fix-parsing-error-windows-resourcedetection.yaml b/.chloggen/fix-parsing-error-windows-resourcedetection.yaml new file mode 100644 index 000000000000..e56b6e9e9eb8 --- /dev/null +++ b/.chloggen/fix-parsing-error-windows-resourcedetection.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/resourcedetection + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Don't parse the field `cpuInfo.Model` if it's blank. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27678] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/resourcedetectionprocessor/internal/system/system.go b/processor/resourcedetectionprocessor/internal/system/system.go index 9b6fa25bc64c..63aec3aaaddb 100644 --- a/processor/resourcedetectionprocessor/internal/system/system.go +++ b/processor/resourcedetectionprocessor/internal/system/system.go @@ -153,11 +153,16 @@ func setHostCPUInfo(d *Detector, cpuInfo cpu.InfoStat) error { } d.rb.SetHostCPUFamily(family) - model, err := strconv.ParseInt(cpuInfo.Model, 10, 64) - if err != nil { - return fmt.Errorf("failed to convert cpuinfo model to integer: %w", err) + // For windows, this field is left blank. See https://github.com/shirou/gopsutil/blob/v3.23.9/cpu/cpu_windows.go#L113 + // Skip setting modelId if the field is blank. + // ISSUE: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27675 + if cpuInfo.Model != "" { + model, err := strconv.ParseInt(cpuInfo.Model, 10, 64) + if err != nil { + return fmt.Errorf("failed to convert cpuinfo model to integer: %w", err) + } + d.rb.SetHostCPUModelID(model) } - d.rb.SetHostCPUModelID(model) d.rb.SetHostCPUModelName(cpuInfo.ModelName) d.rb.SetHostCPUStepping(int64(cpuInfo.Stepping)) From 3c2935f0e3d1398c4e32a3b4a0398eff961e2ad3 Mon Sep 17 00:00:00 2001 From: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com> Date: Mon, 16 Oct 2023 21:52:05 +0530 Subject: [PATCH 2/2] [chore]: Expand e2e testbed (#27251) Related issue: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/20552 Tweak the mock-backend to do following: - Receives data from the receiver. - Returns errors randomly to our receiver, which attempts to resend/drop the data. This is helpful when we're required to test random behaviors of the collector and ensure reliable data delivery. This is my initial PR to expand the testbed. This will help my further efforts to expand the testbed. Myself and @omrozowicz-splunk plan on adding `sending_queue` support to the testbed and expanding the testing capabilities. --------- Co-authored-by: Daniel Jaglowski --- .../traces/correctness_test.go | 60 ++++++++++++ testbed/go.mod | 2 +- testbed/testbed/load_generator.go | 93 +++++++++++++++---- testbed/testbed/mock_backend.go | 63 +++++++++++++ testbed/testbed/options.go | 7 ++ testbed/testbed/receivers.go | 17 +++- testbed/testbed/test_case.go | 5 + testbed/testbed/validator.go | 4 +- 8 files changed, 226 insertions(+), 25 deletions(-) diff --git a/testbed/correctnesstests/traces/correctness_test.go b/testbed/correctnesstests/traces/correctness_test.go index 713042848e11..5a1aab46b516 100644 --- a/testbed/correctnesstests/traces/correctness_test.go +++ b/testbed/correctnesstests/traces/correctness_test.go @@ -92,3 +92,63 @@ func testWithTracingGoldenDataset( tc.ValidateData() } + +func TestSporadicGoldenDataset(t *testing.T) { + testCases := []struct { + decisionFunc func() error + }{ + { + decisionFunc: testbed.RandomNonPermanentError, + }, + { + decisionFunc: testbed.RandomPermanentError, + }, + } + for _, tt := range testCases { + factories, err := testbed.Components() + require.NoError(t, err, "default components resulted in: %v", err) + runner := testbed.NewInProcessCollector(factories) + options := testbed.LoadOptions{DataItemsPerSecond: 10000, ItemsPerBatch: 10} + dataProvider := testbed.NewGoldenDataProvider( + "../../../internal/coreinternal/goldendataset/testdata/generated_pict_pairs_traces.txt", + "../../../internal/coreinternal/goldendataset/testdata/generated_pict_pairs_spans.txt", + "") + sender := testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)) + receiver := testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)) + receiver.WithRetry(` + retry_on_failure: + enabled: false +`) + receiver.WithQueue(` + sending_queue: + enabled: false +`) + _, err = runner.PrepareConfig(correctnesstests.CreateConfigYaml(sender, receiver, nil, "traces")) + require.NoError(t, err, "collector configuration resulted in: %v", err) + validator := testbed.NewCorrectTestValidator(sender.ProtocolName(), receiver.ProtocolName(), dataProvider) + tc := testbed.NewTestCase( + t, + dataProvider, + sender, + receiver, + runner, + validator, + correctnessResults, + testbed.WithSkipResults(), + testbed.WithDecisionFunc(tt.decisionFunc), + ) + defer tc.Stop() + tc.StartBackend() + tc.StartAgent() + tc.StartLoad(options) + tc.Sleep(3 * time.Second) + + tc.StopLoad() + + tc.WaitForN(func() bool { + return tc.LoadGenerator.DataItemsSent()-tc.LoadGenerator.PermanentErrors() == tc.MockBackend.DataItemsReceived() + }, 5*time.Second, "all data items received") + tc.StopAgent() + tc.ValidateData() + } +} diff --git a/testbed/go.mod b/testbed/go.mod index 946145131893..7a2cac8a9510 100644 --- a/testbed/go.mod +++ b/testbed/go.mod @@ -55,6 +55,7 @@ require ( go.uber.org/multierr v1.11.0 go.uber.org/zap v1.26.0 golang.org/x/text v0.13.0 + google.golang.org/grpc v1.58.3 ) require ( @@ -259,7 +260,6 @@ require ( google.golang.org/genproto v0.0.0-20230913181813-007df8e322eb // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230913181813-007df8e322eb // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13 // indirect - google.golang.org/grpc v1.58.3 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/testbed/testbed/load_generator.go b/testbed/testbed/load_generator.go index 7172d82045b2..2bf3dcb6b78f 100644 --- a/testbed/testbed/load_generator.go +++ b/testbed/testbed/load_generator.go @@ -11,6 +11,7 @@ import ( "sync/atomic" "time" + "go.opentelemetry.io/collector/consumer/consumererror" "golang.org/x/text/message" ) @@ -25,6 +26,10 @@ type LoadGenerator struct { // Number of data items (spans or metric data points) sent. dataItemsSent atomic.Uint64 + // Number of permanent errors received + permanentErrors atomic.Uint64 + nonPermanentErrors atomic.Uint64 + stopOnce sync.Once stopWait sync.WaitGroup stopSignal chan struct{} @@ -109,6 +114,14 @@ func (lg *LoadGenerator) DataItemsSent() uint64 { return lg.dataItemsSent.Load() } +func (lg *LoadGenerator) PermanentErrors() uint64 { + return lg.permanentErrors.Load() +} + +func (lg *LoadGenerator) NonPermanentErrors() uint64 { + return lg.nonPermanentErrors.Load() +} + // IncDataItemsSent is used when a test bypasses the LoadGenerator and sends data // directly via TestCases's Sender. This is necessary so that the total number of sent // items in the end is correct, because the reports are printed from LoadGenerator's @@ -184,12 +197,26 @@ func (lg *LoadGenerator) generateTrace() { return } - err := traceSender.ConsumeTraces(context.Background(), traceData) - if err == nil { - lg.prevErr = nil - } else if lg.prevErr == nil || lg.prevErr.Error() != err.Error() { - lg.prevErr = err - log.Printf("Cannot send traces: %v", err) + for { + err := traceSender.ConsumeTraces(context.Background(), traceData) + if err == nil { + lg.prevErr = nil + break + } + + if !consumererror.IsPermanent(err) { + lg.nonPermanentErrors.Add(uint64(traceData.SpanCount())) + continue + } + + lg.permanentErrors.Add(uint64(traceData.SpanCount())) + + // update prevErr to err if it's different than last observed error + if lg.prevErr == nil || lg.prevErr.Error() != err.Error() { + lg.prevErr = err + log.Printf("Cannot send traces: %v", err) + } + break } } @@ -200,13 +227,26 @@ func (lg *LoadGenerator) generateMetrics() { if done { return } - - err := metricSender.ConsumeMetrics(context.Background(), metricData) - if err == nil { - lg.prevErr = nil - } else if lg.prevErr == nil || lg.prevErr.Error() != err.Error() { - lg.prevErr = err - log.Printf("Cannot send metrics: %v", err) + for { + err := metricSender.ConsumeMetrics(context.Background(), metricData) + if err == nil { + lg.prevErr = nil + break + } + + if !consumererror.IsPermanent(err) { + lg.nonPermanentErrors.Add(uint64(metricData.DataPointCount())) + continue + } + + lg.permanentErrors.Add(uint64(metricData.DataPointCount())) + + // update prevErr to err if it's different than last observed error + if lg.prevErr == nil || lg.prevErr.Error() != err.Error() { + lg.prevErr = err + log.Printf("Cannot send metrics: %v", err) + } + break } } @@ -217,12 +257,25 @@ func (lg *LoadGenerator) generateLog() { if done { return } - - err := logSender.ConsumeLogs(context.Background(), logData) - if err == nil { - lg.prevErr = nil - } else if lg.prevErr == nil || lg.prevErr.Error() != err.Error() { - lg.prevErr = err - log.Printf("Cannot send logs: %v", err) + for { + err := logSender.ConsumeLogs(context.Background(), logData) + if err == nil { + lg.prevErr = nil + break + } + + if !consumererror.IsPermanent(err) { + lg.nonPermanentErrors.Add(uint64(logData.LogRecordCount())) + continue + } + + lg.permanentErrors.Add(uint64(logData.LogRecordCount())) + + // update prevErr to err if it's different than last observed error + if lg.prevErr == nil || lg.prevErr.Error() != err.Error() { + lg.prevErr = err + log.Printf("Cannot send logs: %v", err) + } + break } } diff --git a/testbed/testbed/mock_backend.go b/testbed/testbed/mock_backend.go index 751ac496ec38..e6e932985633 100644 --- a/testbed/testbed/mock_backend.go +++ b/testbed/testbed/mock_backend.go @@ -5,18 +5,28 @@ package testbed // import "github.com/open-telemetry/opentelemetry-collector-con import ( "context" + "errors" "log" + "math/rand" "os" "sync" "sync/atomic" "time" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) +var errNonPermanent = errors.New("non permanent error") +var errPermanent = errors.New("permanent error") + +type decisionFunc func() error + // MockBackend is a backend that allows receiving the data locally. type MockBackend struct { // Metric and trace consumers @@ -41,6 +51,13 @@ type MockBackend struct { ReceivedTraces []ptrace.Traces ReceivedMetrics []pmetric.Metrics ReceivedLogs []plog.Logs + + DroppedTraces []ptrace.Traces + DroppedMetrics []pmetric.Metrics + DroppedLogs []plog.Logs + + // decision to return permanent/non-permanent errors + decision decisionFunc } // NewMockBackend creates a new mock backend that receives data using specified receiver. @@ -51,6 +68,7 @@ func NewMockBackend(logFilePath string, receiver DataReceiver) *MockBackend { tc: &MockTraceConsumer{}, mc: &MockMetricConsumer{}, lc: &MockLogConsumer{}, + decision: func() error { return nil }, } mb.tc.backend = mb mb.mc.backend = mb @@ -58,6 +76,10 @@ func NewMockBackend(logFilePath string, receiver DataReceiver) *MockBackend { return mb } +func (mb *MockBackend) WithDecisionFunc(decision decisionFunc) { + mb.decision = decision +} + // Start a backend. func (mb *MockBackend) Start() error { log.Printf("Starting mock backend...") @@ -161,6 +183,13 @@ func (tc *MockTraceConsumer) Capabilities() consumer.Capabilities { } func (tc *MockTraceConsumer) ConsumeTraces(_ context.Context, td ptrace.Traces) error { + if err := tc.backend.decision(); err != nil { + if consumererror.IsPermanent(err) && tc.backend.isRecording { + tc.backend.DroppedTraces = append(tc.backend.DroppedTraces, td) + } + return err + } + tc.numSpansReceived.Add(uint64(td.SpanCount())) rs := td.ResourceSpans() @@ -208,6 +237,13 @@ func (mc *MockMetricConsumer) Capabilities() consumer.Capabilities { } func (mc *MockMetricConsumer) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error { + if err := mc.backend.decision(); err != nil { + if consumererror.IsPermanent(err) && mc.backend.isRecording { + mc.backend.DroppedMetrics = append(mc.backend.DroppedMetrics, md) + } + return err + } + mc.numMetricsReceived.Add(uint64(md.DataPointCount())) mc.backend.ConsumeMetric(md) return nil @@ -233,8 +269,35 @@ func (lc *MockLogConsumer) Capabilities() consumer.Capabilities { } func (lc *MockLogConsumer) ConsumeLogs(_ context.Context, ld plog.Logs) error { + if err := lc.backend.decision(); err != nil { + if consumererror.IsPermanent(err) && lc.backend.isRecording { + lc.backend.DroppedLogs = append(lc.backend.DroppedLogs, ld) + } + return err + } + recordCount := ld.LogRecordCount() lc.numLogRecordsReceived.Add(uint64(recordCount)) lc.backend.ConsumeLogs(ld) return nil } + +// randomNonPermanentError is a decision function that succeeds approximately +// half of the time and fails with a non-permanent error the rest of the time. +func RandomNonPermanentError() error { + code := codes.Unavailable + s := status.New(code, errNonPermanent.Error()) + if rand.Float32() < 0.5 { + return s.Err() + } + return nil +} + +// randomPermanentError is a decision function that succeeds approximately +// half of the time and fails with a permanent error the rest of the time. +func RandomPermanentError() error { + if rand.Float32() < 0.5 { + return consumererror.NewPermanent(errPermanent) + } + return nil +} diff --git a/testbed/testbed/options.go b/testbed/testbed/options.go index 3b0e51d6f975..ac37313f466f 100644 --- a/testbed/testbed/options.go +++ b/testbed/testbed/options.go @@ -66,3 +66,10 @@ func WithResourceLimits(resourceSpec ResourceSpec) TestCaseOption { } } } + +// WithDecision enables our mock backend to behave sporadically +func WithDecisionFunc(decision decisionFunc) TestCaseOption { + return func(tc *TestCase) { + tc.decision = decision + } +} diff --git a/testbed/testbed/receivers.go b/testbed/testbed/receivers.go index 8c14efe4dffc..59937042e358 100644 --- a/testbed/testbed/receivers.go +++ b/testbed/testbed/receivers.go @@ -53,6 +53,8 @@ type BaseOTLPDataReceiver struct { metricsReceiver receiver.Metrics logReceiver receiver.Logs compression string + retry string + sendingQueue string } func (bor *BaseOTLPDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error { @@ -91,6 +93,16 @@ func (bor *BaseOTLPDataReceiver) WithCompression(compression string) *BaseOTLPDa return bor } +func (bor *BaseOTLPDataReceiver) WithRetry(retry string) *BaseOTLPDataReceiver { + bor.retry = retry + return bor +} + +func (bor *BaseOTLPDataReceiver) WithQueue(sendingQueue string) *BaseOTLPDataReceiver { + bor.sendingQueue = sendingQueue + return bor +} + func (bor *BaseOTLPDataReceiver) Stop() error { if err := bor.traceReceiver.Shutdown(context.Background()); err != nil { return err @@ -114,9 +126,10 @@ func (bor *BaseOTLPDataReceiver) GenConfigYAMLStr() string { str := fmt.Sprintf(` %s: endpoint: "%s" + %s + %s tls: - insecure: true`, bor.exporterType, addr) - + insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue) comp := "none" if bor.compression != "" { comp = bor.compression diff --git a/testbed/testbed/test_case.go b/testbed/testbed/test_case.go index 74dcb78dd9f3..c765f334ed9e 100644 --- a/testbed/testbed/test_case.go +++ b/testbed/testbed/test_case.go @@ -51,6 +51,9 @@ type TestCase struct { doneSignal chan struct{} errorCause string resultsSummary TestResultsSummary + + // decision makes mockbackend return permanent/non-permament errors at random basis + decision decisionFunc } const mibibyte = 1024 * 1024 @@ -77,6 +80,7 @@ func NewTestCase( agentProc: agentProc, validator: validator, resultsSummary: resultsSummary, + decision: func() error { return nil }, } // Get requested test case duration from env variable. @@ -111,6 +115,7 @@ func NewTestCase( require.NoError(t, err, "Cannot create generator") tc.MockBackend = NewMockBackend(tc.composeTestResultFileName("backend.log"), receiver) + tc.MockBackend.WithDecisionFunc(tc.decision) go tc.logStats() diff --git a/testbed/testbed/validator.go b/testbed/testbed/validator.go index 8ce6326d52ff..b22d10200e2d 100644 --- a/testbed/testbed/validator.go +++ b/testbed/testbed/validator.go @@ -80,13 +80,13 @@ func NewCorrectTestValidator(senderName string, receiverName string, provider Da func (v *CorrectnessTestValidator) Validate(tc *TestCase) { if assert.EqualValues(tc.t, - int64(tc.LoadGenerator.DataItemsSent()), + int64(tc.LoadGenerator.DataItemsSent())-int64(tc.LoadGenerator.PermanentErrors()), int64(tc.MockBackend.DataItemsReceived()), "Received and sent counters do not match.") { log.Printf("Sent and received data counters match.") } if len(tc.MockBackend.ReceivedTraces) > 0 { - v.assertSentRecdTracingDataEqual(tc.MockBackend.ReceivedTraces) + v.assertSentRecdTracingDataEqual(append(tc.MockBackend.ReceivedTraces, tc.MockBackend.DroppedTraces...)) } assert.EqualValues(tc.t, 0, len(v.assertionFailures), "There are span data mismatches.") }