Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DSET-4359: Fix flaky tests #46

Merged
merged 9 commits into from
Jul 31, 2023
13 changes: 7 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,17 @@ test-many-times:
else \
COUNT=$(COUNT); \
fi; \
prefix=out-test-many-times-; \
rm -rfv $${prefix}*; \
for i in `seq 1 $${COUNT}`; do \
echo "Running test $${i} / $${COUNT}"; \
rm -rfv out-test-$${i}.log; \
make test 2>&1 | tee out-test-$${i}.log; \
echo; \
grep -H FAIL out-test-$${i}.log; \
echo "Running test $${i} / $${COUNT} - BEGIN"; \
make test 2>&1 | tee $${prefix}-$${i}.log | awk '{print "'$${i}'/'$${COUNT}'", $$0; }' ; \
echo; \
grep -H FAIL $${prefix}-$${i}.log; \
echo "Running test $${i} / $${COUNT} - END"; \
done; \
echo "Grep for FAIL - no lines should be found"; \
! grep -H FAIL out-test-*.log;
! grep -H FAIL $${prefix}-*.log;

foo:
! false
Expand Down
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Release Notes

## 0.12.0

* fix: make client shutdown timeout configurable.

## 0.0.11

* feat: make client shutdown timeout configurable.
Expand Down
26 changes: 18 additions & 8 deletions pkg/buffer_config/buffer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ import (
const (
ShouldSentBufferSize = 5 * 1024 * 1024
// LimitBufferSize defines maximum payload size (before compression) for REST API
LimitBufferSize = 5*1024*1024 + 960*1024
MinimalMaxElapsedTime = time.Second
MinimalMaxInterval = time.Second
MinimalInitialInterval = 50 * time.Millisecond
MinimalMultiplier = 0.0
MinimalRandomizationFactor = 0.0
LimitBufferSize = 5*1024*1024 + 960*1024
MinimalMaxElapsedTime = time.Second
MinimalMaxInterval = time.Second
MinimalInitialInterval = 50 * time.Millisecond
MinimalMultiplier = 0.0
MinimalRandomizationFactor = 0.0
MinimalRetryShutdownTimeout = 2 * MinimalMaxElapsedTime
)

type DataSetBufferSettings struct {
Expand Down Expand Up @@ -147,7 +148,7 @@ func (cfg *DataSetBufferSettings) WithOptions(opts ...DataSetBufferSettingsOptio

func (cfg *DataSetBufferSettings) String() string {
return fmt.Sprintf(
"MaxLifetime: %s, MaxSize: %d, GroupBy: %s, RetryRandomizationFactor: %f, RetryMultiplier: %f, RetryInitialInterval: %s, RetryMaxInterval: %s, RetryMaxElapsedTime: %s",
"MaxLifetime: %s, MaxSize: %d, GroupBy: %s, RetryRandomizationFactor: %f, RetryMultiplier: %f, RetryInitialInterval: %s, RetryMaxInterval: %s, RetryMaxElapsedTime: %s, RetryShutdownTimeout: %s",
cfg.MaxLifetime,
cfg.MaxSize,
cfg.GroupBy,
Expand All @@ -156,6 +157,7 @@ func (cfg *DataSetBufferSettings) String() string {
cfg.RetryInitialInterval,
cfg.RetryMaxInterval,
cfg.RetryMaxElapsedTime,
cfg.RetryShutdownTimeout,
)
}

Expand Down Expand Up @@ -202,11 +204,19 @@ func (cfg *DataSetBufferSettings) Validate() error {

if cfg.RetryRandomizationFactor <= MinimalRandomizationFactor {
return fmt.Errorf(
"RetryRandomizationFactor has value %f which is less or equal than %f",
"RetryRandomizationFactor has value %f which is less or equal than %f",
cfg.RetryRandomizationFactor,
MinimalRandomizationFactor,
)
}

if cfg.RetryShutdownTimeout < MinimalRetryShutdownTimeout {
return fmt.Errorf(
"RetryShutdownTimeout has value %s which is less than %s",
cfg.RetryShutdownTimeout,
MinimalRetryShutdownTimeout,
)
}

return nil
}
9 changes: 8 additions & 1 deletion pkg/buffer_config/buffer_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestConfigWithOptions(t *testing.T) {
WithRetryInitialInterval(8*time.Second),
WithRetryMaxInterval(30*time.Second),
WithRetryMaxElapsedTime(10*time.Minute),
WithRetryShutdownTimeout(2*time.Minute),
)

assert.Nil(t, errB)
Expand All @@ -41,6 +42,7 @@ func TestConfigWithOptions(t *testing.T) {
RetryInitialInterval: 8 * time.Second,
RetryMaxInterval: 30 * time.Second,
RetryMaxElapsedTime: 10 * time.Minute,
RetryShutdownTimeout: 2 * time.Minute,
}, *bufCfg)
}

Expand All @@ -52,6 +54,7 @@ func TestDataConfigUpdate(t *testing.T) {
WithRetryInitialInterval(8*time.Second),
WithRetryMaxInterval(30*time.Second),
WithRetryMaxElapsedTime(10*time.Minute),
WithRetryShutdownTimeout(2*time.Minute),
)
assert.Nil(t, errB)

Expand All @@ -62,6 +65,7 @@ func TestDataConfigUpdate(t *testing.T) {
RetryInitialInterval: 8 * time.Second,
RetryMaxInterval: 30 * time.Second,
RetryMaxElapsedTime: 10 * time.Minute,
RetryShutdownTimeout: 2 * time.Minute,
}, *bufCfg)

bufCfg2, err := bufCfg.WithOptions(
Expand All @@ -71,6 +75,7 @@ func TestDataConfigUpdate(t *testing.T) {
WithRetryInitialInterval(28*time.Second),
WithRetryMaxInterval(230*time.Second),
WithRetryMaxElapsedTime(210*time.Minute),
WithRetryShutdownTimeout(5*time.Minute),
)
assert.Nil(t, err)

Expand All @@ -82,6 +87,7 @@ func TestDataConfigUpdate(t *testing.T) {
RetryInitialInterval: 8 * time.Second,
RetryMaxInterval: 30 * time.Second,
RetryMaxElapsedTime: 10 * time.Minute,
RetryShutdownTimeout: 2 * time.Minute,
}, *bufCfg)

// new config is changed
Expand All @@ -92,12 +98,13 @@ func TestDataConfigUpdate(t *testing.T) {
RetryInitialInterval: 28 * time.Second,
RetryMaxInterval: 230 * time.Second,
RetryMaxElapsedTime: 210 * time.Minute,
RetryShutdownTimeout: 5 * time.Minute,
}, *bufCfg2)
}

func TestDataConfigNewDefaultToString(t *testing.T) {
cfg := NewDefaultDataSetBufferSettings()
assert.Equal(t, "MaxLifetime: 5s, MaxSize: 6225920, GroupBy: [], RetryRandomizationFactor: 0.500000, RetryMultiplier: 1.500000, RetryInitialInterval: 5s, RetryMaxInterval: 30s, RetryMaxElapsedTime: 5m0s", cfg.String())
assert.Equal(t, "MaxLifetime: 5s, MaxSize: 6225920, GroupBy: [], RetryRandomizationFactor: 0.500000, RetryMultiplier: 1.500000, RetryInitialInterval: 5s, RetryMaxInterval: 30s, RetryMaxElapsedTime: 5m0s, RetryShutdownTimeout: 30s", cfg.String())
}

func TestDataConfigNewDefaultIsValid(t *testing.T) {
Expand Down
105 changes: 68 additions & 37 deletions pkg/client/add_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,28 +217,38 @@ func (client *DataSetClient) isProcessingEvents() bool {
// - tries (with 2nd half of shutdownMaxTimeout period) to send processed events (buffers) into DataSet
func (client *DataSetClient) Shutdown() error {
client.Logger.Info("Shutting down - BEGIN")
// start measuring processing time
processingStart := time.Now()

// mark as finished to prevent processing of further events
client.finished.Store(true)

// log statistics when finish was called
client.logStatistics()

retryShutdownTimeout := client.Config.BufferSettings.RetryShutdownTimeout
maxElapsedTime := retryShutdownTimeout/2 - 100*time.Millisecond
client.Logger.Info(
"Shutting down - waiting for events",
zap.Duration("maxElapsedTime", maxElapsedTime),
zap.Duration("retryShutdownTimeout", retryShutdownTimeout),
zap.Duration("elapsedTime", time.Since(processingStart)),
)

var lastError error = nil
shutdownTimeout := minDuration(client.Config.BufferSettings.RetryMaxElapsedTime, client.Config.BufferSettings.RetryShutdownTimeout)
expBackoff := backoff.ExponentialBackOff{
InitialInterval: client.Config.BufferSettings.RetryInitialInterval,
RandomizationFactor: client.Config.BufferSettings.RetryRandomizationFactor,
Multiplier: client.Config.BufferSettings.RetryMultiplier,
MaxInterval: client.Config.BufferSettings.RetryMaxInterval,
MaxElapsedTime: shutdownTimeout / 2,
MaxElapsedTime: maxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
expBackoff.Reset()

// try (with timeout) to process (add into buffers) events,
retryNum := 0
processingStart := time.Now()
for client.isProcessingEvents() {
// log statistics
client.logStatistics()
Expand All @@ -250,38 +260,40 @@ func (client *DataSetClient) Shutdown() error {
zap.Duration("backoffDelay", backoffDelay),
zap.Uint64("eventsEnqueued", client.eventsEnqueued.Load()),
zap.Uint64("eventsProcessed", client.eventsProcessed.Load()),
zap.Duration("elapsedTime", time.Since(processingStart)),
zap.Duration("maxElapsedTime", maxElapsedTime),
)
if backoffDelay == expBackoff.Stop {
lastError = fmt.Errorf(
"not all events have been processed - %d",
client.eventsEnqueued.Load()-client.eventsProcessed.Load(),
)
client.Logger.Error(
"Shutting down - not all events have been processed",
zap.Int("retryNum", retryNum),
zap.Duration("backoffDelay", backoffDelay),
zap.Uint64("eventsEnqueued", client.eventsEnqueued.Load()),
zap.Uint64("eventsProcessed", client.eventsProcessed.Load()),
)
break
}
time.Sleep(backoffDelay)
retryNum++
}

// send all buffers
client.Logger.Info("Shutting down - publishing all buffers")
client.Logger.Info(
"Shutting down - publishing all buffers",
zap.Duration("retryShutdownTimeout", retryShutdownTimeout),
zap.Duration("elapsedTime", time.Since(processingStart)),
)
client.publishAllBuffers()

// reinitialize expBackoff with MaxElapsedTime based on actually elapsed time of processing (previous phase)
processingElapsed := time.Since(processingStart)
remainingShutdownTimeout := maxDuration(shutdownTimeout-processingElapsed, shutdownTimeout/2)
maxElapsedTime = maxDuration(retryShutdownTimeout-processingElapsed, retryShutdownTimeout/2)
client.Logger.Info(
"Shutting down - waiting for buffers",
zap.Duration("maxElapsedTime", maxElapsedTime),
zap.Duration("retryShutdownTimeout", retryShutdownTimeout),
zap.Duration("elapsedTime", time.Since(processingStart)),
)

expBackoff = backoff.ExponentialBackOff{
InitialInterval: client.Config.BufferSettings.RetryInitialInterval,
RandomizationFactor: client.Config.BufferSettings.RetryRandomizationFactor,
Multiplier: client.Config.BufferSettings.RetryMultiplier,
MaxInterval: client.Config.BufferSettings.RetryMaxInterval,
MaxElapsedTime: remainingShutdownTimeout,
MaxElapsedTime: maxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
Expand All @@ -301,25 +313,43 @@ func (client *DataSetClient) Shutdown() error {
zap.Uint64("buffersEnqueued", client.buffersEnqueued.Load()),
zap.Uint64("buffersProcessed", client.buffersProcessed.Load()),
zap.Uint64("buffersDropped", client.buffersDropped.Load()),
zap.Duration("elapsedTime", time.Since(processingStart)),
zap.Duration("maxElapsedTime", maxElapsedTime),
)
if backoffDelay == expBackoff.Stop {
lastError = fmt.Errorf(
"not all buffers have been processed - %d",
client.buffersEnqueued.Load()-client.buffersProcessed.Load()-client.buffersDropped.Load(),
)
client.Logger.Error(
"Shutting down - not all buffers have been processed",
zap.Int("retryNum", retryNum),
zap.Uint64("buffersEnqueued", client.buffersEnqueued.Load()),
zap.Uint64("buffersProcessed", client.buffersProcessed.Load()),
zap.Uint64("buffersDropped", client.buffersDropped.Load()),
)
break
}
time.Sleep(backoffDelay)
retryNum++
}

// construct error messages
if client.isProcessingEvents() {
lastError = fmt.Errorf(
"not all events have been processed - %d",
client.eventsEnqueued.Load()-client.eventsProcessed.Load(),
)
client.Logger.Error(
"Shutting down - not all events have been processed",
zap.Uint64("eventsEnqueued", client.eventsEnqueued.Load()),
zap.Uint64("eventsProcessed", client.eventsProcessed.Load()),
)
}

if client.isProcessingBuffers() {
lastError = fmt.Errorf(
"not all buffers have been processed - %d",
client.buffersEnqueued.Load()-client.buffersProcessed.Load()-client.buffersDropped.Load(),
)
client.Logger.Error(
"Shutting down - not all buffers have been processed",
zap.Int("retryNum", retryNum),
zap.Uint64("buffersEnqueued", client.buffersEnqueued.Load()),
zap.Uint64("buffersProcessed", client.buffersProcessed.Load()),
zap.Uint64("buffersDropped", client.buffersDropped.Load()),
)
}

buffersDropped := client.buffersDropped.Load() - initialDropped
if buffersDropped > 0 {
lastError = fmt.Errorf(
Expand All @@ -336,9 +366,17 @@ func (client *DataSetClient) Shutdown() error {
client.logStatistics()

if lastError == nil {
client.Logger.Info("Shutting down - success")
client.Logger.Info(
"Shutting down - success",
zap.Duration("retryShutdownTimeout", retryShutdownTimeout),
zap.Duration("elapsedTime", time.Since(processingStart)),
)
} else {
client.Logger.Error("Shutting down - error", zap.Error(lastError))
client.Logger.Error(
"Shutting down - error", zap.Error(lastError),
zap.Duration("retryShutdownTimeout", retryShutdownTimeout),
zap.Duration("elapsedTime", time.Since(processingStart)),
)
if client.LastError() == nil {
return lastError
}
Expand Down Expand Up @@ -475,13 +513,6 @@ func truncateText(text string, length int) string {
return text
}

func minDuration(a, b time.Duration) time.Duration {
if a <= b {
return a
}
return b
}

func maxDuration(a, b time.Duration) time.Duration {
if a >= b {
return a
Expand Down
11 changes: 6 additions & 5 deletions pkg/client/add_events_long_running_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestAddEventsManyLogsShouldSucceed(t *testing.T) {
}

lastCall.Store(time.Now().UnixNano())
time.Sleep(time.Duration(float64(MaxDelay) * 0.7))
time.Sleep(time.Duration(float64(MaxDelay) * 0.6))
payload, err := json.Marshal(map[string]interface{}{
"status": "success",
"bytesCharged": 42,
Expand All @@ -92,12 +92,13 @@ func TestAddEventsManyLogsShouldSucceed(t *testing.T) {
Tokens: config.DataSetTokens{WriteLog: "AAAA"},
BufferSettings: buffer_config.DataSetBufferSettings{
MaxSize: 1000,
MaxLifetime: MaxDelay,
MaxLifetime: 5 * MaxDelay,
RetryRandomizationFactor: 1.0,
RetryMultiplier: 1.0,
RetryInitialInterval: RetryBase,
RetryMaxInterval: RetryBase,
RetryMaxElapsedTime: 10 * RetryBase,
RetryShutdownTimeout: 50 * RetryBase,
},
ServerHostSettings: server_host_config.NewDefaultDataSetServerHostSettings(),
}
Expand Down Expand Up @@ -146,16 +147,16 @@ func TestAddEventsManyLogsShouldSucceed(t *testing.T) {
time.Sleep(time.Duration(float64(MaxDelay) * 0.3))
}

err = sc.Shutdown()
assert.Nil(t, err, err)

for {
if time.Now().UnixNano()-lastCall.Load() > 5*time.Second.Nanoseconds() {
break
}
time.Sleep(time.Second)
}

err = sc.Shutdown()
assert.Nil(t, err, err)

assert.Equal(t, seenKeys, expectedKeys)
assert.Equal(t, processedEvents.Load(), ExpectedLogs, "processed items")
assert.Equal(t, uint64(len(seenKeys)), ExpectedLogs, "unique items")
Expand Down
Loading