Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[testbed] - Add scenarios to handle large files #34417

Merged
merged 15 commits into from
Sep 3, 2024
27 changes: 27 additions & 0 deletions .chloggen/add-large-file-tests.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: testbed

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add test case scenarios to handle large files to existing testbed.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34288]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
72 changes: 72 additions & 0 deletions testbed/testbed/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"log"
"reflect"
"sort"
"strconv"
"strings"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
)
Expand Down Expand Up @@ -561,3 +563,73 @@ func populateSpansMap(spansMap map[string]ptrace.Span, tds []ptrace.Traces) {
func traceIDAndSpanIDToString(traceID pcommon.TraceID, spanID pcommon.SpanID) string {
return fmt.Sprintf("%s-%s", traceID, spanID)
}

type CorrectnessLogTestValidator struct {
dataProvider DataProvider
}

func NewCorrectnessLogTestValidator(provider DataProvider) *CorrectnessLogTestValidator {
return &CorrectnessLogTestValidator{
dataProvider: provider,
}
}

func (c *CorrectnessLogTestValidator) Validate(tc *TestCase) {
if dataProvider, ok := c.dataProvider.(*perfTestDataProvider); ok {
logsReceived := tc.MockBackend.ReceivedLogs

idsSent := make([][2]string, 0)
idsReceived := make([][2]string, 0)

for batch := 0; batch < int(dataProvider.traceIDSequence.Load()); batch++ {
for idx := 0; idx < dataProvider.options.ItemsPerBatch; idx++ {
idsSent = append(idsSent, [2]string{"batch_" + strconv.Itoa(batch), "item_" + strconv.Itoa(idx)})
}
}
for _, log := range logsReceived {
for i := 0; i < log.ResourceLogs().Len(); i++ {
for j := 0; j < log.ResourceLogs().At(i).ScopeLogs().Len(); j++ {
s := log.ResourceLogs().At(i).ScopeLogs().At(j)
for k := 0; k < s.LogRecords().Len(); k++ {
logRecord := s.LogRecords().At(k)
batchIndex, ok := logRecord.Attributes().Get("batch_index")
require.True(tc.t, ok, "batch_index missing from attributes; use perfDataProvider")
itemIndex, ok := logRecord.Attributes().Get("item_index")
require.True(tc.t, ok, "item_index missing from attributes; use perfDataProvider")

idsReceived = append(idsReceived, [2]string{batchIndex.Str(), itemIndex.Str()})
}
}
}
}

assert.ElementsMatch(tc.t, idsSent, idsReceived)
}
}

func (c *CorrectnessLogTestValidator) RecordResults(tc *TestCase) {
rc := tc.agentProc.GetTotalConsumption()

var result string
if tc.t.Failed() {
result = "FAIL"
} else {
result = "PASS"
}

// Remove "Test" prefix from test name.
testName := tc.t.Name()[4:]

tc.resultsSummary.Add(tc.t.Name(), &PerformanceTestResult{
testName: testName,
result: result,
receivedSpanCount: tc.MockBackend.DataItemsReceived(),
sentSpanCount: tc.LoadGenerator.DataItemsSent(),
duration: time.Since(tc.startTime),
cpuPercentageAvg: rc.CPUPercentAvg,
cpuPercentageMax: rc.CPUPercentMax,
ramMibAvg: rc.RAMMiBAvg,
ramMibMax: rc.RAMMiBMax,
errorCause: tc.errorCause,
})
}
117 changes: 117 additions & 0 deletions testbed/tests/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@
package tests

import (
"context"
"path"
"path/filepath"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/datareceivers"
Expand Down Expand Up @@ -233,3 +240,113 @@ func TestLogOtlpSendingQueue(t *testing.T) {
})

}

func TestLogLargeFiles(t *testing.T) {
tests := []struct {
name string
sender testbed.DataSender
receiver testbed.DataReceiver
loadOptions testbed.LoadOptions
sleepSeconds int
}{
{
/*
* The FileLogWriter generates strings almost 100 bytes each.
* With a rate of 200,000 lines per second over a duration of 100 seconds,
* this results in a file size of approximately 2GB over its lifetime.
*/
name: "filelog-largefiles-2Gb-lifetime",
sender: datasenders.NewFileLogWriter(),
receiver: testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)),
loadOptions: testbed.LoadOptions{
DataItemsPerSecond: 200000,
ItemsPerBatch: 1,
Parallel: 100,
},
sleepSeconds: 100,
},
{
/*
* The FileLogWriter generates strings almost 100 bytes each.
* With a rate of 330,000 lines per second over a duration of 200 seconds,
* this results in a file size of approximately 6GB over its lifetime.
*/
name: "filelog-largefiles-6GB-lifetime",
sender: datasenders.NewFileLogWriter(),
receiver: testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)),
loadOptions: testbed.LoadOptions{
DataItemsPerSecond: 330000,
ItemsPerBatch: 10,
Parallel: 10,
},
sleepSeconds: 200,
},
}
processors := map[string]string{
"batch": `
batch:
`}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ScenarioLong(
t,
test.sender,
test.receiver,
test.loadOptions,
performanceResultsSummary,
test.sleepSeconds,
processors,
)
})
}
}

func TestLargeFileOnce(t *testing.T) {
processors := map[string]string{
"batch": `
batch:
`,
}
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
require.NoError(t, err)
sender := datasenders.NewFileLogWriter()
receiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t))
loadOptions := testbed.LoadOptions{
DataItemsPerSecond: 1,
ItemsPerBatch: 10000000,
Parallel: 1,
}

// Write data at once, before starting up the collector
dataProvider := testbed.NewPerfTestDataProvider(loadOptions)
dataItemsGenerated := atomic.Uint64{}
dataProvider.SetLoadGeneratorCounters(&dataItemsGenerated)
ld, _ := dataProvider.GenerateLogs()
atoulme marked this conversation as resolved.
Show resolved Hide resolved

require.NoError(t, sender.ConsumeLogs(context.Background(), ld))
agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2"))

configStr := createConfigYaml(t, sender, receiver, resultDir, processors, nil)
configCleanup, err := agentProc.PrepareConfig(configStr)
require.NoError(t, err)
defer configCleanup()

tc := testbed.NewTestCase(
t,
dataProvider,
sender,
receiver,
agentProc,
&testbed.CorrectnessLogTestValidator{},
performanceResultsSummary,
)
t.Cleanup(tc.Stop)

tc.StartBackend()
tc.StartAgent()

tc.WaitForN(func() bool { return dataItemsGenerated.Load() == tc.MockBackend.DataItemsReceived() }, 20*time.Second, "all logs received")

tc.StopAgent()
atoulme marked this conversation as resolved.
Show resolved Hide resolved
tc.ValidateData()
}
46 changes: 46 additions & 0 deletions testbed/tests/scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,52 @@ func ScenarioSendingQueuesNotFull(
tc.ValidateData()
}

func ScenarioLong(
t *testing.T,
sender testbed.DataSender,
receiver testbed.DataReceiver,
loadOptions testbed.LoadOptions,
resultsSummary testbed.TestResultsSummary,
sleepTime int,
processors map[string]string,
) {
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
require.NoError(t, err)

agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2"))

configStr := createConfigYaml(t, sender, receiver, resultDir, processors, nil)
configCleanup, err := agentProc.PrepareConfig(configStr)
require.NoError(t, err)
defer configCleanup()
dataProvider := testbed.NewPerfTestDataProvider(loadOptions)
tc := testbed.NewTestCase(
t,
dataProvider,
sender,
receiver,
agentProc,
&testbed.CorrectnessLogTestValidator{},
resultsSummary,
)
t.Cleanup(tc.Stop)

tc.StartBackend()
tc.StartAgent()

tc.StartLoad(loadOptions)

tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started")

tc.Sleep(time.Second * time.Duration(sleepTime))

tc.StopLoad()

tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, "all logs received")

tc.ValidateData()
}

func constructLoadOptions(test TestCase) testbed.LoadOptions {
options := testbed.LoadOptions{DataItemsPerSecond: 1000, ItemsPerBatch: 10}
options.Attributes = make(map[string]string)
Expand Down