diff --git a/.chloggen/fix-parsing-error-windows-resourcedetection.yaml b/.chloggen/fix-parsing-error-windows-resourcedetection.yaml new file mode 100644 index 000000000000..e56b6e9e9eb8 --- /dev/null +++ b/.chloggen/fix-parsing-error-windows-resourcedetection.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/resourcedetection + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Don't parse the field `cpuInfo.Model` if it's blank. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27678] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/resourcedetectionprocessor/internal/system/system.go b/processor/resourcedetectionprocessor/internal/system/system.go index 9b6fa25bc64c..63aec3aaaddb 100644 --- a/processor/resourcedetectionprocessor/internal/system/system.go +++ b/processor/resourcedetectionprocessor/internal/system/system.go @@ -153,11 +153,16 @@ func setHostCPUInfo(d *Detector, cpuInfo cpu.InfoStat) error { } d.rb.SetHostCPUFamily(family) - model, err := strconv.ParseInt(cpuInfo.Model, 10, 64) - if err != nil { - return fmt.Errorf("failed to convert cpuinfo model to integer: %w", err) + // For windows, this field is left blank. See https://github.com/shirou/gopsutil/blob/v3.23.9/cpu/cpu_windows.go#L113 + // Skip setting modelId if the field is blank. + // ISSUE: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27675 + if cpuInfo.Model != "" { + model, err := strconv.ParseInt(cpuInfo.Model, 10, 64) + if err != nil { + return fmt.Errorf("failed to convert cpuinfo model to integer: %w", err) + } + d.rb.SetHostCPUModelID(model) } - d.rb.SetHostCPUModelID(model) d.rb.SetHostCPUModelName(cpuInfo.ModelName) d.rb.SetHostCPUStepping(int64(cpuInfo.Stepping)) diff --git a/testbed/correctnesstests/traces/correctness_test.go b/testbed/correctnesstests/traces/correctness_test.go index 713042848e11..5a1aab46b516 100644 --- a/testbed/correctnesstests/traces/correctness_test.go +++ b/testbed/correctnesstests/traces/correctness_test.go @@ -92,3 +92,63 @@ func testWithTracingGoldenDataset( tc.ValidateData() } + +func TestSporadicGoldenDataset(t *testing.T) { + testCases := []struct { + decisionFunc func() error + }{ + { + decisionFunc: testbed.RandomNonPermanentError, + }, + { + decisionFunc: testbed.RandomPermanentError, + }, + } + for _, tt := range testCases { + factories, err := testbed.Components() + require.NoError(t, err, "default components resulted in: %v", err) + runner := testbed.NewInProcessCollector(factories) + options := testbed.LoadOptions{DataItemsPerSecond: 10000, ItemsPerBatch: 10} + dataProvider := testbed.NewGoldenDataProvider( + "../../../internal/coreinternal/goldendataset/testdata/generated_pict_pairs_traces.txt", + "../../../internal/coreinternal/goldendataset/testdata/generated_pict_pairs_spans.txt", + "") + sender := testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)) + receiver := testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)) + receiver.WithRetry(` + retry_on_failure: + enabled: false +`) + receiver.WithQueue(` + sending_queue: + enabled: false +`) + _, err = runner.PrepareConfig(correctnesstests.CreateConfigYaml(sender, receiver, nil, "traces")) + require.NoError(t, err, "collector configuration resulted in: %v", err) + validator := testbed.NewCorrectTestValidator(sender.ProtocolName(), receiver.ProtocolName(), dataProvider) + tc := testbed.NewTestCase( + t, + dataProvider, + sender, + receiver, + runner, + validator, + correctnessResults, + testbed.WithSkipResults(), + testbed.WithDecisionFunc(tt.decisionFunc), + ) + defer tc.Stop() + tc.StartBackend() + tc.StartAgent() + tc.StartLoad(options) + tc.Sleep(3 * time.Second) + + tc.StopLoad() + + tc.WaitForN(func() bool { + return tc.LoadGenerator.DataItemsSent()-tc.LoadGenerator.PermanentErrors() == tc.MockBackend.DataItemsReceived() + }, 5*time.Second, "all data items received") + tc.StopAgent() + tc.ValidateData() + } +} diff --git a/testbed/go.mod b/testbed/go.mod index 946145131893..7a2cac8a9510 100644 --- a/testbed/go.mod +++ b/testbed/go.mod @@ -55,6 +55,7 @@ require ( go.uber.org/multierr v1.11.0 go.uber.org/zap v1.26.0 golang.org/x/text v0.13.0 + google.golang.org/grpc v1.58.3 ) require ( @@ -259,7 +260,6 @@ require ( google.golang.org/genproto v0.0.0-20230913181813-007df8e322eb // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230913181813-007df8e322eb // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13 // indirect - google.golang.org/grpc v1.58.3 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/testbed/testbed/load_generator.go b/testbed/testbed/load_generator.go index 7172d82045b2..2bf3dcb6b78f 100644 --- a/testbed/testbed/load_generator.go +++ b/testbed/testbed/load_generator.go @@ -11,6 +11,7 @@ import ( "sync/atomic" "time" + "go.opentelemetry.io/collector/consumer/consumererror" "golang.org/x/text/message" ) @@ -25,6 +26,10 @@ type LoadGenerator struct { // Number of data items (spans or metric data points) sent. dataItemsSent atomic.Uint64 + // Number of permanent errors received + permanentErrors atomic.Uint64 + nonPermanentErrors atomic.Uint64 + stopOnce sync.Once stopWait sync.WaitGroup stopSignal chan struct{} @@ -109,6 +114,14 @@ func (lg *LoadGenerator) DataItemsSent() uint64 { return lg.dataItemsSent.Load() } +func (lg *LoadGenerator) PermanentErrors() uint64 { + return lg.permanentErrors.Load() +} + +func (lg *LoadGenerator) NonPermanentErrors() uint64 { + return lg.nonPermanentErrors.Load() +} + // IncDataItemsSent is used when a test bypasses the LoadGenerator and sends data // directly via TestCases's Sender. This is necessary so that the total number of sent // items in the end is correct, because the reports are printed from LoadGenerator's @@ -184,12 +197,26 @@ func (lg *LoadGenerator) generateTrace() { return } - err := traceSender.ConsumeTraces(context.Background(), traceData) - if err == nil { - lg.prevErr = nil - } else if lg.prevErr == nil || lg.prevErr.Error() != err.Error() { - lg.prevErr = err - log.Printf("Cannot send traces: %v", err) + for { + err := traceSender.ConsumeTraces(context.Background(), traceData) + if err == nil { + lg.prevErr = nil + break + } + + if !consumererror.IsPermanent(err) { + lg.nonPermanentErrors.Add(uint64(traceData.SpanCount())) + continue + } + + lg.permanentErrors.Add(uint64(traceData.SpanCount())) + + // update prevErr to err if it's different than last observed error + if lg.prevErr == nil || lg.prevErr.Error() != err.Error() { + lg.prevErr = err + log.Printf("Cannot send traces: %v", err) + } + break } } @@ -200,13 +227,26 @@ func (lg *LoadGenerator) generateMetrics() { if done { return } - - err := metricSender.ConsumeMetrics(context.Background(), metricData) - if err == nil { - lg.prevErr = nil - } else if lg.prevErr == nil || lg.prevErr.Error() != err.Error() { - lg.prevErr = err - log.Printf("Cannot send metrics: %v", err) + for { + err := metricSender.ConsumeMetrics(context.Background(), metricData) + if err == nil { + lg.prevErr = nil + break + } + + if !consumererror.IsPermanent(err) { + lg.nonPermanentErrors.Add(uint64(metricData.DataPointCount())) + continue + } + + lg.permanentErrors.Add(uint64(metricData.DataPointCount())) + + // update prevErr to err if it's different than last observed error + if lg.prevErr == nil || lg.prevErr.Error() != err.Error() { + lg.prevErr = err + log.Printf("Cannot send metrics: %v", err) + } + break } } @@ -217,12 +257,25 @@ func (lg *LoadGenerator) generateLog() { if done { return } - - err := logSender.ConsumeLogs(context.Background(), logData) - if err == nil { - lg.prevErr = nil - } else if lg.prevErr == nil || lg.prevErr.Error() != err.Error() { - lg.prevErr = err - log.Printf("Cannot send logs: %v", err) + for { + err := logSender.ConsumeLogs(context.Background(), logData) + if err == nil { + lg.prevErr = nil + break + } + + if !consumererror.IsPermanent(err) { + lg.nonPermanentErrors.Add(uint64(logData.LogRecordCount())) + continue + } + + lg.permanentErrors.Add(uint64(logData.LogRecordCount())) + + // update prevErr to err if it's different than last observed error + if lg.prevErr == nil || lg.prevErr.Error() != err.Error() { + lg.prevErr = err + log.Printf("Cannot send logs: %v", err) + } + break } } diff --git a/testbed/testbed/mock_backend.go b/testbed/testbed/mock_backend.go index 751ac496ec38..e6e932985633 100644 --- a/testbed/testbed/mock_backend.go +++ b/testbed/testbed/mock_backend.go @@ -5,18 +5,28 @@ package testbed // import "github.com/open-telemetry/opentelemetry-collector-con import ( "context" + "errors" "log" + "math/rand" "os" "sync" "sync/atomic" "time" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) +var errNonPermanent = errors.New("non permanent error") +var errPermanent = errors.New("permanent error") + +type decisionFunc func() error + // MockBackend is a backend that allows receiving the data locally. type MockBackend struct { // Metric and trace consumers @@ -41,6 +51,13 @@ type MockBackend struct { ReceivedTraces []ptrace.Traces ReceivedMetrics []pmetric.Metrics ReceivedLogs []plog.Logs + + DroppedTraces []ptrace.Traces + DroppedMetrics []pmetric.Metrics + DroppedLogs []plog.Logs + + // decision to return permanent/non-permanent errors + decision decisionFunc } // NewMockBackend creates a new mock backend that receives data using specified receiver. @@ -51,6 +68,7 @@ func NewMockBackend(logFilePath string, receiver DataReceiver) *MockBackend { tc: &MockTraceConsumer{}, mc: &MockMetricConsumer{}, lc: &MockLogConsumer{}, + decision: func() error { return nil }, } mb.tc.backend = mb mb.mc.backend = mb @@ -58,6 +76,10 @@ func NewMockBackend(logFilePath string, receiver DataReceiver) *MockBackend { return mb } +func (mb *MockBackend) WithDecisionFunc(decision decisionFunc) { + mb.decision = decision +} + // Start a backend. func (mb *MockBackend) Start() error { log.Printf("Starting mock backend...") @@ -161,6 +183,13 @@ func (tc *MockTraceConsumer) Capabilities() consumer.Capabilities { } func (tc *MockTraceConsumer) ConsumeTraces(_ context.Context, td ptrace.Traces) error { + if err := tc.backend.decision(); err != nil { + if consumererror.IsPermanent(err) && tc.backend.isRecording { + tc.backend.DroppedTraces = append(tc.backend.DroppedTraces, td) + } + return err + } + tc.numSpansReceived.Add(uint64(td.SpanCount())) rs := td.ResourceSpans() @@ -208,6 +237,13 @@ func (mc *MockMetricConsumer) Capabilities() consumer.Capabilities { } func (mc *MockMetricConsumer) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error { + if err := mc.backend.decision(); err != nil { + if consumererror.IsPermanent(err) && mc.backend.isRecording { + mc.backend.DroppedMetrics = append(mc.backend.DroppedMetrics, md) + } + return err + } + mc.numMetricsReceived.Add(uint64(md.DataPointCount())) mc.backend.ConsumeMetric(md) return nil @@ -233,8 +269,35 @@ func (lc *MockLogConsumer) Capabilities() consumer.Capabilities { } func (lc *MockLogConsumer) ConsumeLogs(_ context.Context, ld plog.Logs) error { + if err := lc.backend.decision(); err != nil { + if consumererror.IsPermanent(err) && lc.backend.isRecording { + lc.backend.DroppedLogs = append(lc.backend.DroppedLogs, ld) + } + return err + } + recordCount := ld.LogRecordCount() lc.numLogRecordsReceived.Add(uint64(recordCount)) lc.backend.ConsumeLogs(ld) return nil } + +// randomNonPermanentError is a decision function that succeeds approximately +// half of the time and fails with a non-permanent error the rest of the time. +func RandomNonPermanentError() error { + code := codes.Unavailable + s := status.New(code, errNonPermanent.Error()) + if rand.Float32() < 0.5 { + return s.Err() + } + return nil +} + +// randomPermanentError is a decision function that succeeds approximately +// half of the time and fails with a permanent error the rest of the time. +func RandomPermanentError() error { + if rand.Float32() < 0.5 { + return consumererror.NewPermanent(errPermanent) + } + return nil +} diff --git a/testbed/testbed/options.go b/testbed/testbed/options.go index 3b0e51d6f975..ac37313f466f 100644 --- a/testbed/testbed/options.go +++ b/testbed/testbed/options.go @@ -66,3 +66,10 @@ func WithResourceLimits(resourceSpec ResourceSpec) TestCaseOption { } } } + +// WithDecision enables our mock backend to behave sporadically +func WithDecisionFunc(decision decisionFunc) TestCaseOption { + return func(tc *TestCase) { + tc.decision = decision + } +} diff --git a/testbed/testbed/receivers.go b/testbed/testbed/receivers.go index 8c14efe4dffc..59937042e358 100644 --- a/testbed/testbed/receivers.go +++ b/testbed/testbed/receivers.go @@ -53,6 +53,8 @@ type BaseOTLPDataReceiver struct { metricsReceiver receiver.Metrics logReceiver receiver.Logs compression string + retry string + sendingQueue string } func (bor *BaseOTLPDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error { @@ -91,6 +93,16 @@ func (bor *BaseOTLPDataReceiver) WithCompression(compression string) *BaseOTLPDa return bor } +func (bor *BaseOTLPDataReceiver) WithRetry(retry string) *BaseOTLPDataReceiver { + bor.retry = retry + return bor +} + +func (bor *BaseOTLPDataReceiver) WithQueue(sendingQueue string) *BaseOTLPDataReceiver { + bor.sendingQueue = sendingQueue + return bor +} + func (bor *BaseOTLPDataReceiver) Stop() error { if err := bor.traceReceiver.Shutdown(context.Background()); err != nil { return err @@ -114,9 +126,10 @@ func (bor *BaseOTLPDataReceiver) GenConfigYAMLStr() string { str := fmt.Sprintf(` %s: endpoint: "%s" + %s + %s tls: - insecure: true`, bor.exporterType, addr) - + insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue) comp := "none" if bor.compression != "" { comp = bor.compression diff --git a/testbed/testbed/test_case.go b/testbed/testbed/test_case.go index 74dcb78dd9f3..c765f334ed9e 100644 --- a/testbed/testbed/test_case.go +++ b/testbed/testbed/test_case.go @@ -51,6 +51,9 @@ type TestCase struct { doneSignal chan struct{} errorCause string resultsSummary TestResultsSummary + + // decision makes mockbackend return permanent/non-permament errors at random basis + decision decisionFunc } const mibibyte = 1024 * 1024 @@ -77,6 +80,7 @@ func NewTestCase( agentProc: agentProc, validator: validator, resultsSummary: resultsSummary, + decision: func() error { return nil }, } // Get requested test case duration from env variable. @@ -111,6 +115,7 @@ func NewTestCase( require.NoError(t, err, "Cannot create generator") tc.MockBackend = NewMockBackend(tc.composeTestResultFileName("backend.log"), receiver) + tc.MockBackend.WithDecisionFunc(tc.decision) go tc.logStats() diff --git a/testbed/testbed/validator.go b/testbed/testbed/validator.go index 8ce6326d52ff..b22d10200e2d 100644 --- a/testbed/testbed/validator.go +++ b/testbed/testbed/validator.go @@ -80,13 +80,13 @@ func NewCorrectTestValidator(senderName string, receiverName string, provider Da func (v *CorrectnessTestValidator) Validate(tc *TestCase) { if assert.EqualValues(tc.t, - int64(tc.LoadGenerator.DataItemsSent()), + int64(tc.LoadGenerator.DataItemsSent())-int64(tc.LoadGenerator.PermanentErrors()), int64(tc.MockBackend.DataItemsReceived()), "Received and sent counters do not match.") { log.Printf("Sent and received data counters match.") } if len(tc.MockBackend.ReceivedTraces) > 0 { - v.assertSentRecdTracingDataEqual(tc.MockBackend.ReceivedTraces) + v.assertSentRecdTracingDataEqual(append(tc.MockBackend.ReceivedTraces, tc.MockBackend.DroppedTraces...)) } assert.EqualValues(tc.t, 0, len(v.assertionFailures), "There are span data mismatches.") }