Skip to content

Commit

Permalink
DSET-4359: Fix flaky tests (#46)
Browse files Browse the repository at this point in the history
* DSET-4359: Fix flaky tests

* Fix failing tests

* Increase duration of running tests

* Try to increase timeout for stress tests.

* Increase timeout and decrease lifetime

* Another attempt to fix flaky tests

* Include max elapsed time into more places

* Increase timeout for test with large event

* Increase version to 0.12.0
  • Loading branch information
martin-majlis-s1 authored Jul 31, 2023
1 parent aca8af3 commit 9c8ae03
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 63 deletions.
13 changes: 7 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,17 @@ test-many-times:
else \
COUNT=$(COUNT); \
fi; \
prefix=out-test-many-times-; \
rm -rfv $${prefix}*; \
for i in `seq 1 $${COUNT}`; do \
echo "Running test $${i} / $${COUNT}"; \
rm -rfv out-test-$${i}.log; \
make test 2>&1 | tee out-test-$${i}.log; \
echo; \
grep -H FAIL out-test-$${i}.log; \
echo "Running test $${i} / $${COUNT} - BEGIN"; \
make test 2>&1 | tee $${prefix}-$${i}.log | awk '{print "'$${i}'/'$${COUNT}'", $$0; }' ; \
echo; \
grep -H FAIL $${prefix}-$${i}.log; \
echo "Running test $${i} / $${COUNT} - END"; \
done; \
echo "Grep for FAIL - no lines should be found"; \
! grep -H FAIL out-test-*.log;
! grep -H FAIL $${prefix}-*.log;

foo:
! false
Expand Down
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Release Notes

## 0.12.0

* fix: make client shutdown timeout configurable.

## 0.0.11

* feat: make client shutdown timeout configurable.
Expand Down
26 changes: 18 additions & 8 deletions pkg/buffer_config/buffer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ import (
const (
ShouldSentBufferSize = 5 * 1024 * 1024
// LimitBufferSize defines maximum payload size (before compression) for REST API
LimitBufferSize = 5*1024*1024 + 960*1024
MinimalMaxElapsedTime = time.Second
MinimalMaxInterval = time.Second
MinimalInitialInterval = 50 * time.Millisecond
MinimalMultiplier = 0.0
MinimalRandomizationFactor = 0.0
LimitBufferSize = 5*1024*1024 + 960*1024
MinimalMaxElapsedTime = time.Second
MinimalMaxInterval = time.Second
MinimalInitialInterval = 50 * time.Millisecond
MinimalMultiplier = 0.0
MinimalRandomizationFactor = 0.0
MinimalRetryShutdownTimeout = 2 * MinimalMaxElapsedTime
)

type DataSetBufferSettings struct {
Expand Down Expand Up @@ -147,7 +148,7 @@ func (cfg *DataSetBufferSettings) WithOptions(opts ...DataSetBufferSettingsOptio

func (cfg *DataSetBufferSettings) String() string {
return fmt.Sprintf(
"MaxLifetime: %s, MaxSize: %d, GroupBy: %s, RetryRandomizationFactor: %f, RetryMultiplier: %f, RetryInitialInterval: %s, RetryMaxInterval: %s, RetryMaxElapsedTime: %s",
"MaxLifetime: %s, MaxSize: %d, GroupBy: %s, RetryRandomizationFactor: %f, RetryMultiplier: %f, RetryInitialInterval: %s, RetryMaxInterval: %s, RetryMaxElapsedTime: %s, RetryShutdownTimeout: %s",
cfg.MaxLifetime,
cfg.MaxSize,
cfg.GroupBy,
Expand All @@ -156,6 +157,7 @@ func (cfg *DataSetBufferSettings) String() string {
cfg.RetryInitialInterval,
cfg.RetryMaxInterval,
cfg.RetryMaxElapsedTime,
cfg.RetryShutdownTimeout,
)
}

Expand Down Expand Up @@ -202,11 +204,19 @@ func (cfg *DataSetBufferSettings) Validate() error {

if cfg.RetryRandomizationFactor <= MinimalRandomizationFactor {
return fmt.Errorf(
"RetryRandomizationFactor has value %f which is less or equal than %f",
"RetryRandomizationFactor has value %f which is less or equal than %f",
cfg.RetryRandomizationFactor,
MinimalRandomizationFactor,
)
}

if cfg.RetryShutdownTimeout < MinimalRetryShutdownTimeout {
return fmt.Errorf(
"RetryShutdownTimeout has value %s which is less than %s",
cfg.RetryShutdownTimeout,
MinimalRetryShutdownTimeout,
)
}

return nil
}
9 changes: 8 additions & 1 deletion pkg/buffer_config/buffer_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestConfigWithOptions(t *testing.T) {
WithRetryInitialInterval(8*time.Second),
WithRetryMaxInterval(30*time.Second),
WithRetryMaxElapsedTime(10*time.Minute),
WithRetryShutdownTimeout(2*time.Minute),
)

assert.Nil(t, errB)
Expand All @@ -41,6 +42,7 @@ func TestConfigWithOptions(t *testing.T) {
RetryInitialInterval: 8 * time.Second,
RetryMaxInterval: 30 * time.Second,
RetryMaxElapsedTime: 10 * time.Minute,
RetryShutdownTimeout: 2 * time.Minute,
}, *bufCfg)
}

Expand All @@ -52,6 +54,7 @@ func TestDataConfigUpdate(t *testing.T) {
WithRetryInitialInterval(8*time.Second),
WithRetryMaxInterval(30*time.Second),
WithRetryMaxElapsedTime(10*time.Minute),
WithRetryShutdownTimeout(2*time.Minute),
)
assert.Nil(t, errB)

Expand All @@ -62,6 +65,7 @@ func TestDataConfigUpdate(t *testing.T) {
RetryInitialInterval: 8 * time.Second,
RetryMaxInterval: 30 * time.Second,
RetryMaxElapsedTime: 10 * time.Minute,
RetryShutdownTimeout: 2 * time.Minute,
}, *bufCfg)

bufCfg2, err := bufCfg.WithOptions(
Expand All @@ -71,6 +75,7 @@ func TestDataConfigUpdate(t *testing.T) {
WithRetryInitialInterval(28*time.Second),
WithRetryMaxInterval(230*time.Second),
WithRetryMaxElapsedTime(210*time.Minute),
WithRetryShutdownTimeout(5*time.Minute),
)
assert.Nil(t, err)

Expand All @@ -82,6 +87,7 @@ func TestDataConfigUpdate(t *testing.T) {
RetryInitialInterval: 8 * time.Second,
RetryMaxInterval: 30 * time.Second,
RetryMaxElapsedTime: 10 * time.Minute,
RetryShutdownTimeout: 2 * time.Minute,
}, *bufCfg)

// new config is changed
Expand All @@ -92,12 +98,13 @@ func TestDataConfigUpdate(t *testing.T) {
RetryInitialInterval: 28 * time.Second,
RetryMaxInterval: 230 * time.Second,
RetryMaxElapsedTime: 210 * time.Minute,
RetryShutdownTimeout: 5 * time.Minute,
}, *bufCfg2)
}

func TestDataConfigNewDefaultToString(t *testing.T) {
cfg := NewDefaultDataSetBufferSettings()
assert.Equal(t, "MaxLifetime: 5s, MaxSize: 6225920, GroupBy: [], RetryRandomizationFactor: 0.500000, RetryMultiplier: 1.500000, RetryInitialInterval: 5s, RetryMaxInterval: 30s, RetryMaxElapsedTime: 5m0s", cfg.String())
assert.Equal(t, "MaxLifetime: 5s, MaxSize: 6225920, GroupBy: [], RetryRandomizationFactor: 0.500000, RetryMultiplier: 1.500000, RetryInitialInterval: 5s, RetryMaxInterval: 30s, RetryMaxElapsedTime: 5m0s, RetryShutdownTimeout: 30s", cfg.String())
}

func TestDataConfigNewDefaultIsValid(t *testing.T) {
Expand Down
105 changes: 68 additions & 37 deletions pkg/client/add_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,28 +217,38 @@ func (client *DataSetClient) isProcessingEvents() bool {
// - tries (with 2nd half of shutdownMaxTimeout period) to send processed events (buffers) into DataSet
func (client *DataSetClient) Shutdown() error {
client.Logger.Info("Shutting down - BEGIN")
// start measuring processing time
processingStart := time.Now()

// mark as finished to prevent processing of further events
client.finished.Store(true)

// log statistics when finish was called
client.logStatistics()

retryShutdownTimeout := client.Config.BufferSettings.RetryShutdownTimeout
maxElapsedTime := retryShutdownTimeout/2 - 100*time.Millisecond
client.Logger.Info(
"Shutting down - waiting for events",
zap.Duration("maxElapsedTime", maxElapsedTime),
zap.Duration("retryShutdownTimeout", retryShutdownTimeout),
zap.Duration("elapsedTime", time.Since(processingStart)),
)

var lastError error = nil
shutdownTimeout := minDuration(client.Config.BufferSettings.RetryMaxElapsedTime, client.Config.BufferSettings.RetryShutdownTimeout)
expBackoff := backoff.ExponentialBackOff{
InitialInterval: client.Config.BufferSettings.RetryInitialInterval,
RandomizationFactor: client.Config.BufferSettings.RetryRandomizationFactor,
Multiplier: client.Config.BufferSettings.RetryMultiplier,
MaxInterval: client.Config.BufferSettings.RetryMaxInterval,
MaxElapsedTime: shutdownTimeout / 2,
MaxElapsedTime: maxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
expBackoff.Reset()

// try (with timeout) to process (add into buffers) events,
retryNum := 0
processingStart := time.Now()
for client.isProcessingEvents() {
// log statistics
client.logStatistics()
Expand All @@ -250,38 +260,40 @@ func (client *DataSetClient) Shutdown() error {
zap.Duration("backoffDelay", backoffDelay),
zap.Uint64("eventsEnqueued", client.eventsEnqueued.Load()),
zap.Uint64("eventsProcessed", client.eventsProcessed.Load()),
zap.Duration("elapsedTime", time.Since(processingStart)),
zap.Duration("maxElapsedTime", maxElapsedTime),
)
if backoffDelay == expBackoff.Stop {
lastError = fmt.Errorf(
"not all events have been processed - %d",
client.eventsEnqueued.Load()-client.eventsProcessed.Load(),
)
client.Logger.Error(
"Shutting down - not all events have been processed",
zap.Int("retryNum", retryNum),
zap.Duration("backoffDelay", backoffDelay),
zap.Uint64("eventsEnqueued", client.eventsEnqueued.Load()),
zap.Uint64("eventsProcessed", client.eventsProcessed.Load()),
)
break
}
time.Sleep(backoffDelay)
retryNum++
}

// send all buffers
client.Logger.Info("Shutting down - publishing all buffers")
client.Logger.Info(
"Shutting down - publishing all buffers",
zap.Duration("retryShutdownTimeout", retryShutdownTimeout),
zap.Duration("elapsedTime", time.Since(processingStart)),
)
client.publishAllBuffers()

// reinitialize expBackoff with MaxElapsedTime based on actually elapsed time of processing (previous phase)
processingElapsed := time.Since(processingStart)
remainingShutdownTimeout := maxDuration(shutdownTimeout-processingElapsed, shutdownTimeout/2)
maxElapsedTime = maxDuration(retryShutdownTimeout-processingElapsed, retryShutdownTimeout/2)
client.Logger.Info(
"Shutting down - waiting for buffers",
zap.Duration("maxElapsedTime", maxElapsedTime),
zap.Duration("retryShutdownTimeout", retryShutdownTimeout),
zap.Duration("elapsedTime", time.Since(processingStart)),
)

expBackoff = backoff.ExponentialBackOff{
InitialInterval: client.Config.BufferSettings.RetryInitialInterval,
RandomizationFactor: client.Config.BufferSettings.RetryRandomizationFactor,
Multiplier: client.Config.BufferSettings.RetryMultiplier,
MaxInterval: client.Config.BufferSettings.RetryMaxInterval,
MaxElapsedTime: remainingShutdownTimeout,
MaxElapsedTime: maxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
Expand All @@ -301,25 +313,43 @@ func (client *DataSetClient) Shutdown() error {
zap.Uint64("buffersEnqueued", client.buffersEnqueued.Load()),
zap.Uint64("buffersProcessed", client.buffersProcessed.Load()),
zap.Uint64("buffersDropped", client.buffersDropped.Load()),
zap.Duration("elapsedTime", time.Since(processingStart)),
zap.Duration("maxElapsedTime", maxElapsedTime),
)
if backoffDelay == expBackoff.Stop {
lastError = fmt.Errorf(
"not all buffers have been processed - %d",
client.buffersEnqueued.Load()-client.buffersProcessed.Load()-client.buffersDropped.Load(),
)
client.Logger.Error(
"Shutting down - not all buffers have been processed",
zap.Int("retryNum", retryNum),
zap.Uint64("buffersEnqueued", client.buffersEnqueued.Load()),
zap.Uint64("buffersProcessed", client.buffersProcessed.Load()),
zap.Uint64("buffersDropped", client.buffersDropped.Load()),
)
break
}
time.Sleep(backoffDelay)
retryNum++
}

// construct error messages
if client.isProcessingEvents() {
lastError = fmt.Errorf(
"not all events have been processed - %d",
client.eventsEnqueued.Load()-client.eventsProcessed.Load(),
)
client.Logger.Error(
"Shutting down - not all events have been processed",
zap.Uint64("eventsEnqueued", client.eventsEnqueued.Load()),
zap.Uint64("eventsProcessed", client.eventsProcessed.Load()),
)
}

if client.isProcessingBuffers() {
lastError = fmt.Errorf(
"not all buffers have been processed - %d",
client.buffersEnqueued.Load()-client.buffersProcessed.Load()-client.buffersDropped.Load(),
)
client.Logger.Error(
"Shutting down - not all buffers have been processed",
zap.Int("retryNum", retryNum),
zap.Uint64("buffersEnqueued", client.buffersEnqueued.Load()),
zap.Uint64("buffersProcessed", client.buffersProcessed.Load()),
zap.Uint64("buffersDropped", client.buffersDropped.Load()),
)
}

buffersDropped := client.buffersDropped.Load() - initialDropped
if buffersDropped > 0 {
lastError = fmt.Errorf(
Expand All @@ -336,9 +366,17 @@ func (client *DataSetClient) Shutdown() error {
client.logStatistics()

if lastError == nil {
client.Logger.Info("Shutting down - success")
client.Logger.Info(
"Shutting down - success",
zap.Duration("retryShutdownTimeout", retryShutdownTimeout),
zap.Duration("elapsedTime", time.Since(processingStart)),
)
} else {
client.Logger.Error("Shutting down - error", zap.Error(lastError))
client.Logger.Error(
"Shutting down - error", zap.Error(lastError),
zap.Duration("retryShutdownTimeout", retryShutdownTimeout),
zap.Duration("elapsedTime", time.Since(processingStart)),
)
if client.LastError() == nil {
return lastError
}
Expand Down Expand Up @@ -475,13 +513,6 @@ func truncateText(text string, length int) string {
return text
}

func minDuration(a, b time.Duration) time.Duration {
if a <= b {
return a
}
return b
}

func maxDuration(a, b time.Duration) time.Duration {
if a >= b {
return a
Expand Down
11 changes: 6 additions & 5 deletions pkg/client/add_events_long_running_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestAddEventsManyLogsShouldSucceed(t *testing.T) {
}

lastCall.Store(time.Now().UnixNano())
time.Sleep(time.Duration(float64(MaxDelay) * 0.7))
time.Sleep(time.Duration(float64(MaxDelay) * 0.6))
payload, err := json.Marshal(map[string]interface{}{
"status": "success",
"bytesCharged": 42,
Expand All @@ -92,12 +92,13 @@ func TestAddEventsManyLogsShouldSucceed(t *testing.T) {
Tokens: config.DataSetTokens{WriteLog: "AAAA"},
BufferSettings: buffer_config.DataSetBufferSettings{
MaxSize: 1000,
MaxLifetime: MaxDelay,
MaxLifetime: 5 * MaxDelay,
RetryRandomizationFactor: 1.0,
RetryMultiplier: 1.0,
RetryInitialInterval: RetryBase,
RetryMaxInterval: RetryBase,
RetryMaxElapsedTime: 10 * RetryBase,
RetryShutdownTimeout: 50 * RetryBase,
},
ServerHostSettings: server_host_config.NewDefaultDataSetServerHostSettings(),
}
Expand Down Expand Up @@ -146,16 +147,16 @@ func TestAddEventsManyLogsShouldSucceed(t *testing.T) {
time.Sleep(time.Duration(float64(MaxDelay) * 0.3))
}

err = sc.Shutdown()
assert.Nil(t, err, err)

for {
if time.Now().UnixNano()-lastCall.Load() > 5*time.Second.Nanoseconds() {
break
}
time.Sleep(time.Second)
}

err = sc.Shutdown()
assert.Nil(t, err, err)

assert.Equal(t, seenKeys, expectedKeys)
assert.Equal(t, processedEvents.Load(), ExpectedLogs, "processed items")
assert.Equal(t, uint64(len(seenKeys)), ExpectedLogs, "unique items")
Expand Down
Loading

0 comments on commit 9c8ae03

Please sign in to comment.