Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[testbed] - Add scenarios to handle large files #34417

Merged
merged 15 commits into from
Sep 3, 2024
72 changes: 72 additions & 0 deletions testbed/testbed/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
"log"
"reflect"
"sort"
"strconv"
"strings"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
)
Expand Down Expand Up @@ -561,3 +563,73 @@
func traceIDAndSpanIDToString(traceID pcommon.TraceID, spanID pcommon.SpanID) string {
return fmt.Sprintf("%s-%s", traceID, spanID)
}

type CorrectnessLogTestValidator struct {
dataProvider DataProvider
}

func NewCorrectnessLogTestValidator(provider DataProvider) *CorrectnessLogTestValidator {
return &CorrectnessLogTestValidator{
dataProvider: provider,
}
}

func (c *CorrectnessLogTestValidator) Validate(tc *TestCase) {
if dataProvider, ok := c.dataProvider.(*perfTestDataProvider); ok {
logsReceived := tc.MockBackend.ReceivedLogs

idsSent := make([][2]string, 0)
idsReceived := make([][2]string, 0)

for batch := 0; batch < int(dataProvider.traceIDSequence.Load()); batch++ {
for idx := 0; idx < dataProvider.options.ItemsPerBatch; idx++ {
idsSent = append(idsSent, [2]string{"batch_" + strconv.Itoa(batch), "item_" + strconv.Itoa(idx)})
}
}
for _, log := range logsReceived {
for i := 0; i < log.ResourceLogs().Len(); i++ {
for j := 0; j < log.ResourceLogs().At(i).ScopeLogs().Len(); j++ {
s := log.ResourceLogs().At(i).ScopeLogs().At(j)
for k := 0; k < s.LogRecords().Len(); k++ {
logRecord := s.LogRecords().At(k)
batchIndex, ok := logRecord.Attributes().Get("batch_index")
require.True(tc.t, ok, "batch_index missing from attributes; use perfDataProvider")
itemIndex, ok := logRecord.Attributes().Get("item_index")
require.True(tc.t, ok, "item_index missing from attributes; use perfDataProvider")

idsReceived = append(idsReceived, [2]string{batchIndex.Str(), itemIndex.Str()})
}
}
}
}

assert.ElementsMatch(tc.t, idsSent, idsReceived)
}
}

func (v *CorrectnessLogTestValidator) RecordResults(tc *TestCase) {

Check failure on line 610 in testbed/testbed/validator.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, other)

receiver-naming: receiver name v should be consistent with previous receiver name c for CorrectnessLogTestValidator (revive)

Check failure on line 610 in testbed/testbed/validator.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, other)

receiver-naming: receiver name v should be consistent with previous receiver name c for CorrectnessLogTestValidator (revive)
rc := tc.agentProc.GetTotalConsumption()

var result string
if tc.t.Failed() {
result = "FAIL"
} else {
result = "PASS"
}

// Remove "Test" prefix from test name.
testName := tc.t.Name()[4:]

tc.resultsSummary.Add(tc.t.Name(), &PerformanceTestResult{
testName: testName,
result: result,
receivedSpanCount: tc.MockBackend.DataItemsReceived(),
sentSpanCount: tc.LoadGenerator.DataItemsSent(),
duration: time.Since(tc.startTime),
cpuPercentageAvg: rc.CPUPercentAvg,
cpuPercentageMax: rc.CPUPercentMax,
ramMibAvg: rc.RAMMiBAvg,
ramMibMax: rc.RAMMiBMax,
errorCause: tc.errorCause,
})
}
121 changes: 121 additions & 0 deletions testbed/tests/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,20 @@
package tests

import (
"context"
"fmt"
"path"
"path/filepath"
"sync/atomic"
"testing"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/datareceivers"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/datasenders"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/plog"
)

func TestLog10kDPS(t *testing.T) {
Expand Down Expand Up @@ -233,3 +241,116 @@ func TestLogOtlpSendingQueue(t *testing.T) {
})

}

func TestLogLargeFiles(t *testing.T) {
tests := []struct {
name string
sender testbed.DataSender
receiver testbed.DataReceiver
loadOptions testbed.LoadOptions
sleepSeconds int
}{
{
name: "filelog-largefiles-2Gb-lifetime",
sender: datasenders.NewFileLogWriter(),
receiver: testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)),
loadOptions: testbed.LoadOptions{
DataItemsPerSecond: 200000,
ItemsPerBatch: 1,
Parallel: 100,
},
sleepSeconds: 100,
},
{
name: "filelog-largefiles-6GB-lifetime",
sender: datasenders.NewFileLogWriter(),
receiver: testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)),
loadOptions: testbed.LoadOptions{
DataItemsPerSecond: 330000,
ItemsPerBatch: 10,
Parallel: 10,
},
sleepSeconds: 200,
},
{
name: "filelog-largefiles-50MB/sec",
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
sender: datasenders.NewFileLogWriter(),
receiver: testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)),
loadOptions: testbed.LoadOptions{
DataItemsPerSecond: 400000,
ItemsPerBatch: 100,
Parallel: 1,
},
sleepSeconds: 100,
},
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
}
processors := map[string]string{
"batch": `
batch:
`}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ScenarioLong(
t,
test.sender,
test.receiver,
test.loadOptions,
performanceResultsSummary,
test.sleepSeconds,
processors,
)
})
}
}

func TestLargeFileOnce(t *testing.T) {
processors := map[string]string{
"batch": `
batch:
`,
}
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
require.NoError(t, err)
sender := datasenders.NewFileLogWriter()
receiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t))
loadOptions := testbed.LoadOptions{
DataItemsPerSecond: 1,
ItemsPerBatch: 10000000,
Parallel: 1,
}

// Write data at once, before starting up the collector
dataProvider := testbed.NewPerfTestDataProvider(loadOptions)
dataItemsGenerated := atomic.Uint64{}
dataProvider.SetLoadGeneratorCounters(&dataItemsGenerated)
ld, _ := dataProvider.GenerateLogs()
atoulme marked this conversation as resolved.
Show resolved Hide resolved

m := &plog.ProtoMarshaler{}
fmt.Println(m.LogsSize(ld))
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, sender.ConsumeLogs(context.Background(), ld))
agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2"))

configStr := createConfigYaml(t, sender, receiver, resultDir, processors, nil)
configCleanup, err := agentProc.PrepareConfig(configStr)
require.NoError(t, err)
defer configCleanup()

tc := testbed.NewTestCase(
t,
dataProvider,
sender,
receiver,
agentProc,
&testbed.CorrectnessLogTestValidator{},
performanceResultsSummary,
)
defer tc.Stop()
djaglowski marked this conversation as resolved.
Show resolved Hide resolved

tc.StartBackend()
tc.StartAgent()

tc.WaitForN(func() bool { return dataItemsGenerated.Load() == tc.MockBackend.DataItemsReceived() }, 20*time.Second, "all logs received")

tc.StopAgent()
atoulme marked this conversation as resolved.
Show resolved Hide resolved
tc.ValidateData()
}
47 changes: 47 additions & 0 deletions testbed/tests/scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,53 @@ func ScenarioSendingQueuesNotFull(
tc.ValidateData()
}

func ScenarioLong(
t *testing.T,
sender testbed.DataSender,
receiver testbed.DataReceiver,
loadOptions testbed.LoadOptions,
resultsSummary testbed.TestResultsSummary,
sleepTime int,
processors map[string]string,
) {
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
require.NoError(t, err)

agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2"))

configStr := createConfigYaml(t, sender, receiver, resultDir, processors, nil)
configCleanup, err := agentProc.PrepareConfig(configStr)
require.NoError(t, err)
defer configCleanup()
dataProvider := testbed.NewPerfTestDataProvider(loadOptions)
tc := testbed.NewTestCase(
t,
dataProvider,
sender,
receiver,
agentProc,
&testbed.CorrectnessLogTestValidator{},
resultsSummary,
)
defer tc.Stop()

tc.StartBackend()
tc.StartAgent()

tc.StartLoad(loadOptions)

tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started")

tc.Sleep(time.Second * time.Duration(sleepTime))

tc.StopLoad()

tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, "all spans received")
djaglowski marked this conversation as resolved.
Show resolved Hide resolved

tc.StopAgent()
tc.ValidateData()
}

func constructLoadOptions(test TestCase) testbed.LoadOptions {
options := testbed.LoadOptions{DataItemsPerSecond: 1000, ItemsPerBatch: 10}
options.Attributes = make(map[string]string)
Expand Down
Loading