Skip to content

Commit

Permalink
[chore][Testbed] - Try flushing the data before exiting in load gener…
Browse files Browse the repository at this point in the history
…ator (open-telemetry#35211)

**Description:** In the `LoadGenerator`, if we encounter a non-permanent
error and are in the process of retrying, we should not immediately exit
upon receiving a stop signal. Instead, we need to first flush the
existing data and then proceed to exit the LoadGenerator. This is
necessary for stress test cases and we need to validate the data
received and data sent.

---------

Co-authored-by: Daniel Jaglowski <jaglows3@gmail.com>
  • Loading branch information
VihasMakwana and djaglowski authored Sep 24, 2024
1 parent 2fb1c78 commit 2ac31be
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions testbed/testbed/load_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type LoadOptions struct {

// Parallel specifies how many goroutines to send from.
Parallel int

// MaxDelay defines the longest amount of time we can continue retrying for non-permanent errors.
MaxDelay time.Duration
}

var _ LoadGenerator = (*ProviderSender)(nil)
Expand Down Expand Up @@ -112,6 +115,11 @@ func (ps *ProviderSender) Start(options LoadOptions) {
ps.options.ItemsPerBatch = 10
}

if ps.options.MaxDelay == 0 {
// retry for an additional 10 seconds by default
ps.options.MaxDelay = time.Second * 10
}

log.Printf("Starting load generator at %d items/sec.", ps.options.DataItemsPerSecond)

// Indicate that generation is in progress.
Expand Down Expand Up @@ -240,6 +248,7 @@ func (ps *ProviderSender) generateTrace() error {
traceSender := ps.Sender.(TraceDataSender)

traceData, done := ps.Provider.GenerateTraces()
timer := time.NewTimer(ps.options.MaxDelay)
if done {
return nil
}
Expand All @@ -258,9 +267,8 @@ func (ps *ProviderSender) generateTrace() error {
return fmt.Errorf("cannot send traces: %w", err)
}
ps.nonPermanentErrors.Add(uint64(traceData.SpanCount()))

select {
case <-ps.stopSignal:
case <-timer.C:
return nil
default:
}
Expand All @@ -271,6 +279,7 @@ func (ps *ProviderSender) generateMetrics() error {
metricSender := ps.Sender.(MetricDataSender)

metricData, done := ps.Provider.GenerateMetrics()
timer := time.NewTimer(ps.options.MaxDelay)
if done {
return nil
}
Expand All @@ -291,7 +300,7 @@ func (ps *ProviderSender) generateMetrics() error {
ps.nonPermanentErrors.Add(uint64(metricData.DataPointCount()))

select {
case <-ps.stopSignal:
case <-timer.C:
return nil
default:
}
Expand All @@ -302,6 +311,7 @@ func (ps *ProviderSender) generateLog() error {
logSender := ps.Sender.(LogDataSender)

logData, done := ps.Provider.GenerateLogs()
timer := time.NewTimer(ps.options.MaxDelay)
if done {
return nil
}
Expand All @@ -322,7 +332,7 @@ func (ps *ProviderSender) generateLog() error {
ps.nonPermanentErrors.Add(uint64(logData.LogRecordCount()))

select {
case <-ps.stopSignal:
case <-timer.C:
return nil
default:
}
Expand Down

0 comments on commit 2ac31be

Please sign in to comment.