Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][Testbed] - Try flushing the data before exiting in load generator #35211

Merged
18 changes: 14 additions & 4 deletions testbed/testbed/load_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type LoadOptions struct {

// Parallel specifies how many goroutines to send from.
Parallel int

// MaxDelay defines the longest amount of time we can continue retrying for non-permanent errors.
MaxDelay time.Duration
}

var _ LoadGenerator = (*ProviderSender)(nil)
Expand Down Expand Up @@ -112,6 +115,11 @@ func (ps *ProviderSender) Start(options LoadOptions) {
ps.options.ItemsPerBatch = 10
}

if ps.options.MaxDelay == 0 {
// retry for an additional 10 seconds by default
ps.options.MaxDelay = time.Second * 10
}

log.Printf("Starting load generator at %d items/sec.", ps.options.DataItemsPerSecond)

// Indicate that generation is in progress.
Expand Down Expand Up @@ -240,6 +248,7 @@ func (ps *ProviderSender) generateTrace() error {
traceSender := ps.Sender.(TraceDataSender)

traceData, done := ps.Provider.GenerateTraces()
timer := time.NewTimer(ps.options.MaxDelay)
if done {
return nil
}
Expand All @@ -258,9 +267,8 @@ func (ps *ProviderSender) generateTrace() error {
return fmt.Errorf("cannot send traces: %w", err)
}
ps.nonPermanentErrors.Add(uint64(traceData.SpanCount()))

VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-ps.stopSignal:
case <-timer.C:
return nil
default:
}
Expand All @@ -271,6 +279,7 @@ func (ps *ProviderSender) generateMetrics() error {
metricSender := ps.Sender.(MetricDataSender)

metricData, done := ps.Provider.GenerateMetrics()
timer := time.NewTimer(ps.options.MaxDelay)
if done {
return nil
}
Expand All @@ -291,7 +300,7 @@ func (ps *ProviderSender) generateMetrics() error {
ps.nonPermanentErrors.Add(uint64(metricData.DataPointCount()))

select {
case <-ps.stopSignal:
case <-timer.C:
return nil
default:
}
Expand All @@ -302,6 +311,7 @@ func (ps *ProviderSender) generateLog() error {
logSender := ps.Sender.(LogDataSender)

logData, done := ps.Provider.GenerateLogs()
timer := time.NewTimer(ps.options.MaxDelay)
if done {
return nil
}
Expand All @@ -322,7 +332,7 @@ func (ps *ProviderSender) generateLog() error {
ps.nonPermanentErrors.Add(uint64(logData.LogRecordCount()))

select {
case <-ps.stopSignal:
case <-timer.C:
return nil
default:
}
Expand Down