Skip to content

Commit

Permalink
Merge branch 'main' into feature/hzahav/async-udp-reader-conccurency
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Oct 16, 2023
2 parents 49aee3a + 3c2935f commit 68b97d5
Show file tree
Hide file tree
Showing 10 changed files with 262 additions and 29 deletions.
27 changes: 27 additions & 0 deletions .chloggen/fix-parsing-error-windows-resourcedetection.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/resourcedetection

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Don't parse the field `cpuInfo.Model` if it's blank.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27678]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
13 changes: 9 additions & 4 deletions processor/resourcedetectionprocessor/internal/system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,16 @@ func setHostCPUInfo(d *Detector, cpuInfo cpu.InfoStat) error {
}
d.rb.SetHostCPUFamily(family)

model, err := strconv.ParseInt(cpuInfo.Model, 10, 64)
if err != nil {
return fmt.Errorf("failed to convert cpuinfo model to integer: %w", err)
// For windows, this field is left blank. See https://github.com/shirou/gopsutil/blob/v3.23.9/cpu/cpu_windows.go#L113
// Skip setting modelId if the field is blank.
// ISSUE: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27675
if cpuInfo.Model != "" {
model, err := strconv.ParseInt(cpuInfo.Model, 10, 64)
if err != nil {
return fmt.Errorf("failed to convert cpuinfo model to integer: %w", err)
}
d.rb.SetHostCPUModelID(model)
}
d.rb.SetHostCPUModelID(model)

d.rb.SetHostCPUModelName(cpuInfo.ModelName)
d.rb.SetHostCPUStepping(int64(cpuInfo.Stepping))
Expand Down
60 changes: 60 additions & 0 deletions testbed/correctnesstests/traces/correctness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,63 @@ func testWithTracingGoldenDataset(

tc.ValidateData()
}

func TestSporadicGoldenDataset(t *testing.T) {
testCases := []struct {
decisionFunc func() error
}{
{
decisionFunc: testbed.RandomNonPermanentError,
},
{
decisionFunc: testbed.RandomPermanentError,
},
}
for _, tt := range testCases {
factories, err := testbed.Components()
require.NoError(t, err, "default components resulted in: %v", err)
runner := testbed.NewInProcessCollector(factories)
options := testbed.LoadOptions{DataItemsPerSecond: 10000, ItemsPerBatch: 10}
dataProvider := testbed.NewGoldenDataProvider(
"../../../internal/coreinternal/goldendataset/testdata/generated_pict_pairs_traces.txt",
"../../../internal/coreinternal/goldendataset/testdata/generated_pict_pairs_spans.txt",
"")
sender := testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t))
receiver := testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t))
receiver.WithRetry(`
retry_on_failure:
enabled: false
`)
receiver.WithQueue(`
sending_queue:
enabled: false
`)
_, err = runner.PrepareConfig(correctnesstests.CreateConfigYaml(sender, receiver, nil, "traces"))
require.NoError(t, err, "collector configuration resulted in: %v", err)
validator := testbed.NewCorrectTestValidator(sender.ProtocolName(), receiver.ProtocolName(), dataProvider)
tc := testbed.NewTestCase(
t,
dataProvider,
sender,
receiver,
runner,
validator,
correctnessResults,
testbed.WithSkipResults(),
testbed.WithDecisionFunc(tt.decisionFunc),
)
defer tc.Stop()
tc.StartBackend()
tc.StartAgent()
tc.StartLoad(options)
tc.Sleep(3 * time.Second)

tc.StopLoad()

tc.WaitForN(func() bool {
return tc.LoadGenerator.DataItemsSent()-tc.LoadGenerator.PermanentErrors() == tc.MockBackend.DataItemsReceived()
}, 5*time.Second, "all data items received")
tc.StopAgent()
tc.ValidateData()
}
}
2 changes: 1 addition & 1 deletion testbed/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ require (
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.26.0
golang.org/x/text v0.13.0
google.golang.org/grpc v1.58.3
)

require (
Expand Down Expand Up @@ -259,7 +260,6 @@ require (
google.golang.org/genproto v0.0.0-20230913181813-007df8e322eb // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230913181813-007df8e322eb // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13 // indirect
google.golang.org/grpc v1.58.3 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
Expand Down
93 changes: 73 additions & 20 deletions testbed/testbed/load_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync/atomic"
"time"

"go.opentelemetry.io/collector/consumer/consumererror"
"golang.org/x/text/message"
)

Expand All @@ -25,6 +26,10 @@ type LoadGenerator struct {
// Number of data items (spans or metric data points) sent.
dataItemsSent atomic.Uint64

// Number of permanent errors received
permanentErrors atomic.Uint64
nonPermanentErrors atomic.Uint64

stopOnce sync.Once
stopWait sync.WaitGroup
stopSignal chan struct{}
Expand Down Expand Up @@ -109,6 +114,14 @@ func (lg *LoadGenerator) DataItemsSent() uint64 {
return lg.dataItemsSent.Load()
}

func (lg *LoadGenerator) PermanentErrors() uint64 {
return lg.permanentErrors.Load()
}

func (lg *LoadGenerator) NonPermanentErrors() uint64 {
return lg.nonPermanentErrors.Load()
}

// IncDataItemsSent is used when a test bypasses the LoadGenerator and sends data
// directly via TestCases's Sender. This is necessary so that the total number of sent
// items in the end is correct, because the reports are printed from LoadGenerator's
Expand Down Expand Up @@ -184,12 +197,26 @@ func (lg *LoadGenerator) generateTrace() {
return
}

err := traceSender.ConsumeTraces(context.Background(), traceData)
if err == nil {
lg.prevErr = nil
} else if lg.prevErr == nil || lg.prevErr.Error() != err.Error() {
lg.prevErr = err
log.Printf("Cannot send traces: %v", err)
for {
err := traceSender.ConsumeTraces(context.Background(), traceData)
if err == nil {
lg.prevErr = nil
break
}

if !consumererror.IsPermanent(err) {
lg.nonPermanentErrors.Add(uint64(traceData.SpanCount()))
continue
}

lg.permanentErrors.Add(uint64(traceData.SpanCount()))

// update prevErr to err if it's different than last observed error
if lg.prevErr == nil || lg.prevErr.Error() != err.Error() {
lg.prevErr = err
log.Printf("Cannot send traces: %v", err)
}
break
}
}

Expand All @@ -200,13 +227,26 @@ func (lg *LoadGenerator) generateMetrics() {
if done {
return
}

err := metricSender.ConsumeMetrics(context.Background(), metricData)
if err == nil {
lg.prevErr = nil
} else if lg.prevErr == nil || lg.prevErr.Error() != err.Error() {
lg.prevErr = err
log.Printf("Cannot send metrics: %v", err)
for {
err := metricSender.ConsumeMetrics(context.Background(), metricData)
if err == nil {
lg.prevErr = nil
break
}

if !consumererror.IsPermanent(err) {
lg.nonPermanentErrors.Add(uint64(metricData.DataPointCount()))
continue
}

lg.permanentErrors.Add(uint64(metricData.DataPointCount()))

// update prevErr to err if it's different than last observed error
if lg.prevErr == nil || lg.prevErr.Error() != err.Error() {
lg.prevErr = err
log.Printf("Cannot send metrics: %v", err)
}
break
}
}

Expand All @@ -217,12 +257,25 @@ func (lg *LoadGenerator) generateLog() {
if done {
return
}

err := logSender.ConsumeLogs(context.Background(), logData)
if err == nil {
lg.prevErr = nil
} else if lg.prevErr == nil || lg.prevErr.Error() != err.Error() {
lg.prevErr = err
log.Printf("Cannot send logs: %v", err)
for {
err := logSender.ConsumeLogs(context.Background(), logData)
if err == nil {
lg.prevErr = nil
break
}

if !consumererror.IsPermanent(err) {
lg.nonPermanentErrors.Add(uint64(logData.LogRecordCount()))
continue
}

lg.permanentErrors.Add(uint64(logData.LogRecordCount()))

// update prevErr to err if it's different than last observed error
if lg.prevErr == nil || lg.prevErr.Error() != err.Error() {
lg.prevErr = err
log.Printf("Cannot send logs: %v", err)
}
break
}
}
63 changes: 63 additions & 0 deletions testbed/testbed/mock_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,28 @@ package testbed // import "github.com/open-telemetry/opentelemetry-collector-con

import (
"context"
"errors"
"log"
"math/rand"
"os"
"sync"
"sync/atomic"
"time"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var errNonPermanent = errors.New("non permanent error")
var errPermanent = errors.New("permanent error")

type decisionFunc func() error

// MockBackend is a backend that allows receiving the data locally.
type MockBackend struct {
// Metric and trace consumers
Expand All @@ -41,6 +51,13 @@ type MockBackend struct {
ReceivedTraces []ptrace.Traces
ReceivedMetrics []pmetric.Metrics
ReceivedLogs []plog.Logs

DroppedTraces []ptrace.Traces
DroppedMetrics []pmetric.Metrics
DroppedLogs []plog.Logs

// decision to return permanent/non-permanent errors
decision decisionFunc
}

// NewMockBackend creates a new mock backend that receives data using specified receiver.
Expand All @@ -51,13 +68,18 @@ func NewMockBackend(logFilePath string, receiver DataReceiver) *MockBackend {
tc: &MockTraceConsumer{},
mc: &MockMetricConsumer{},
lc: &MockLogConsumer{},
decision: func() error { return nil },
}
mb.tc.backend = mb
mb.mc.backend = mb
mb.lc.backend = mb
return mb
}

func (mb *MockBackend) WithDecisionFunc(decision decisionFunc) {
mb.decision = decision
}

// Start a backend.
func (mb *MockBackend) Start() error {
log.Printf("Starting mock backend...")
Expand Down Expand Up @@ -161,6 +183,13 @@ func (tc *MockTraceConsumer) Capabilities() consumer.Capabilities {
}

func (tc *MockTraceConsumer) ConsumeTraces(_ context.Context, td ptrace.Traces) error {
if err := tc.backend.decision(); err != nil {
if consumererror.IsPermanent(err) && tc.backend.isRecording {
tc.backend.DroppedTraces = append(tc.backend.DroppedTraces, td)
}
return err
}

tc.numSpansReceived.Add(uint64(td.SpanCount()))

rs := td.ResourceSpans()
Expand Down Expand Up @@ -208,6 +237,13 @@ func (mc *MockMetricConsumer) Capabilities() consumer.Capabilities {
}

func (mc *MockMetricConsumer) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error {
if err := mc.backend.decision(); err != nil {
if consumererror.IsPermanent(err) && mc.backend.isRecording {
mc.backend.DroppedMetrics = append(mc.backend.DroppedMetrics, md)
}
return err
}

mc.numMetricsReceived.Add(uint64(md.DataPointCount()))
mc.backend.ConsumeMetric(md)
return nil
Expand All @@ -233,8 +269,35 @@ func (lc *MockLogConsumer) Capabilities() consumer.Capabilities {
}

func (lc *MockLogConsumer) ConsumeLogs(_ context.Context, ld plog.Logs) error {
if err := lc.backend.decision(); err != nil {
if consumererror.IsPermanent(err) && lc.backend.isRecording {
lc.backend.DroppedLogs = append(lc.backend.DroppedLogs, ld)
}
return err
}

recordCount := ld.LogRecordCount()
lc.numLogRecordsReceived.Add(uint64(recordCount))
lc.backend.ConsumeLogs(ld)
return nil
}

// randomNonPermanentError is a decision function that succeeds approximately
// half of the time and fails with a non-permanent error the rest of the time.
func RandomNonPermanentError() error {
code := codes.Unavailable
s := status.New(code, errNonPermanent.Error())
if rand.Float32() < 0.5 {
return s.Err()
}
return nil
}

// randomPermanentError is a decision function that succeeds approximately
// half of the time and fails with a permanent error the rest of the time.
func RandomPermanentError() error {
if rand.Float32() < 0.5 {
return consumererror.NewPermanent(errPermanent)
}
return nil
}
7 changes: 7 additions & 0 deletions testbed/testbed/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,10 @@ func WithResourceLimits(resourceSpec ResourceSpec) TestCaseOption {
}
}
}

// WithDecision enables our mock backend to behave sporadically
func WithDecisionFunc(decision decisionFunc) TestCaseOption {
return func(tc *TestCase) {
tc.decision = decision
}
}
Loading

0 comments on commit 68b97d5

Please sign in to comment.