Skip to content

replace time.Sleep with assert.Eventually in tests #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion examples/composite/reload_membership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ func TestMembershipChangesBasic(t *testing.T) {
}()

// Wait for initial configuration to be applied
time.Sleep(250 * time.Millisecond)
assert.Eventually(t, func() bool {
return runner.IsRunning()
}, 1*time.Second, 10*time.Millisecond)

// --- Test Step 1: Remove worker1, add worker3 ---
worker3, err := NewTestWorker(t,
Expand Down
67 changes: 28 additions & 39 deletions examples/composite/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ const (
// --- Helper Functions ---

// testLogger creates a logger for tests, optionally capturing output.
func testLogger(t *testing.T, capture bool) (*slog.Logger, *bytes.Buffer) { //nolint:thelper
func testLogger(t *testing.T, capture bool) (*slog.Logger, *bytes.Buffer) {
t.Helper()
var buf bytes.Buffer
w := io.Discard
if capture {
Expand All @@ -40,14 +41,16 @@ func testLogger(t *testing.T, capture bool) (*slog.Logger, *bytes.Buffer) { //no
}

// readWorkerConfig safely reads the current config from the worker.
func readWorkerConfig(w *Worker) WorkerConfig {
func readWorkerConfig(t *testing.T, w *Worker) WorkerConfig {
t.Helper()
w.mu.Lock()
defer w.mu.Unlock()
return w.config // Return a copy
}

// readWorkerName safely reads the current name from the worker.
func readWorkerName(w *Worker) string {
func readWorkerName(t *testing.T, w *Worker) string {
t.Helper()
w.mu.Lock()
defer w.mu.Unlock()
return w.name
Expand Down Expand Up @@ -218,7 +221,7 @@ func TestWorker_Run_ContextCancel(t *testing.T) {
// TestWorker_ReloadWithConfig tests applying valid and invalid configs via ReloadWithConfig.
func TestWorker_ReloadWithConfig(t *testing.T) {
t.Parallel()
logger, logBuf := testLogger(t, true) // Capture logs
logger, _ := testLogger(t, false)

originalConfig := WorkerConfig{
Interval: 100 * time.Millisecond,
Expand All @@ -242,8 +245,10 @@ func TestWorker_ReloadWithConfig(t *testing.T) {

go func() {
defer wg.Done()
// Ignore error here, checked elsewhere
_ = worker.Run(ctx) //nolint:errcheck
err = worker.Run(ctx)
if err != nil {
t.Errorf("Worker run failed: %v", err)
}
}()

// Wait for worker to start running (at least one tick)
Expand All @@ -261,61 +266,47 @@ func TestWorker_ReloadWithConfig(t *testing.T) {

// Wait for the configuration to be applied using Eventually
require.Eventually(t, func() bool {
current := readWorkerConfig(worker)
name := readWorkerName(worker)
current := readWorkerConfig(t, worker)
name := readWorkerName(t, worker)
return current.Interval == newConfig.Interval && current.JobName == newConfig.JobName &&
name == newConfig.JobName
}, 1*time.Second, pollInterval, "Worker config was not updated after valid ReloadWithConfig")
t.Logf("Worker config updated successfully to: %+v", readWorkerConfig(worker))
t.Logf("Worker config updated successfully to: %+v", readWorkerConfig(t, worker))

// --- Test Invalid Config Type ---
configAfterValidReload := readWorkerConfig(worker) // Store state before invalid reload
configAfterValidReload := readWorkerConfig(t, worker) // Store state before invalid reload
t.Log("Reloading with invalid type 'string'")
worker.ReloadWithConfig("invalid type") // Pass a non-WorkerConfig type

// Wait a short time and assert config hasn't changed
time.Sleep(50 * time.Millisecond) // Allow select loop to process if needed
currentConfig := readWorkerConfig(worker)
assert.Eventually(t, func() bool {
return worker.tickCount.Load() > 0
}, 1*time.Second, pollInterval, "Worker did not start ticking")

currentConfig := readWorkerConfig(t, worker)
assert.Equal(
t,
configAfterValidReload,
currentConfig,
"Config should not change after invalid type reload",
)
assert.Contains(
t,
logBuf.String(),
"Invalid config type received",
"Should log error for invalid type",
)
logBuf.Reset() // Clear buffer for next check

// --- Test Invalid Config Values ---
invalidValueConfig := WorkerConfig{Interval: 0, JobName: "wont-apply"}
t.Logf("Reloading with invalid value config: %+v", invalidValueConfig)
worker.ReloadWithConfig(invalidValueConfig)

// Wait and assert config hasn't changed
time.Sleep(50 * time.Millisecond)
currentConfig = readWorkerConfig(worker)
assert.Eventually(t, func() bool {
return worker.tickCount.Load() > 0
}, 1*time.Second, pollInterval, "Worker did not start ticking")
currentConfig = readWorkerConfig(t, worker)
assert.Equal(
t,
configAfterValidReload,
currentConfig,
"Config should not change after invalid value reload",
)
assert.Contains(
t,
logBuf.String(),
"Invalid configuration received in ReloadWithConfig",
"Should log error for invalid value",
)
assert.Contains(
t,
logBuf.String(),
"interval must be positive",
"Should log specific validation error",
)
}

// TestWorker_Execution_Timing tests that the worker ticks according to the configured interval,
Expand Down Expand Up @@ -411,7 +402,7 @@ func TestWorker_Execution_Timing(t *testing.T) {

// Ensure config is applied before measuring again
require.Eventually(t, func() bool {
return readWorkerConfig(worker).Interval == reloadedInterval
return readWorkerConfig(t, worker).Interval == reloadedInterval
}, 1*time.Second, pollInterval, "Config interval did not update after reload")
t.Log("Config interval updated.")

Expand Down Expand Up @@ -481,7 +472,7 @@ func TestWorker_ReloadWithConfig_Concurrency(t *testing.T) {
finalConfig := WorkerConfig{} // Store the config from the last reload goroutine

// Launch concurrent reloads
for i := 0; i < numReloads; i++ {
for i := range numReloads {
go func(index int) {
defer reloadWg.Done()
// Vary interval and name slightly for each reload
Expand All @@ -494,8 +485,6 @@ func TestWorker_ReloadWithConfig_Concurrency(t *testing.T) {
finalConfig = cfg
}
worker.ReloadWithConfig(cfg)
// Add a small random sleep to increase inter-leaving chances
// time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
}(i)
}

Expand All @@ -509,7 +498,7 @@ func TestWorker_ReloadWithConfig_Concurrency(t *testing.T) {
time.Sleep(2 * time.Second)

t.Logf("Final config details - Expected: %v, Current: %v",
finalConfig.Interval, readWorkerConfig(worker).Interval)
finalConfig.Interval, readWorkerConfig(t, worker).Interval)

// Because ReloadWithConfig replaces the config in the channel if full,
// we expect the *last* successfully queued config to eventually be applied.
Expand All @@ -524,7 +513,7 @@ func TestWorker_ReloadWithConfig_Concurrency(t *testing.T) {

t.Logf(
"Final applied config interval: %v (expected: %v)",
readWorkerConfig(worker).Interval,
readWorkerConfig(t, worker).Interval,
finalConfig.Interval,
)
// Check tick count is still advancing (worker didn't get stuck)
Expand Down
15 changes: 12 additions & 3 deletions examples/httpcluster/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ func (cm *ConfigManager) getCurrentPort() string {
// updatePort updates the cluster configuration with a new port
func (cm *ConfigManager) updatePort(newPort string) error {
cm.mu.Lock()
defer cm.mu.Unlock()

// Validate port format using the networking package helper
validatedPort, err := networking.ValidatePort(newPort)
if err != nil {
cm.mu.Unlock()
return fmt.Errorf("invalid port: %w", err)
}

Expand All @@ -90,6 +90,7 @@ func (cm *ConfigManager) updatePort(newPort string) error {
cm.commonMw...,
)
if err != nil {
cm.mu.Unlock()
return fmt.Errorf("failed to create status route: %w", err)
}

Expand All @@ -100,6 +101,7 @@ func (cm *ConfigManager) updatePort(newPort string) error {
cm.commonMw...,
)
if err != nil {
cm.mu.Unlock()
return fmt.Errorf("failed to create config route: %w", err)
}

Expand All @@ -110,15 +112,22 @@ func (cm *ConfigManager) updatePort(newPort string) error {
httpserver.WithDrainTimeout(DrainTimeout),
)
if err != nil {
cm.mu.Unlock()
return fmt.Errorf("failed to create config: %w", err)
}

// Send configuration update (will block until cluster is ready)
// Store the old port and update to new port before releasing lock
oldPort := cm.currentPort
cm.currentPort = newPort

// Release the mutex before blocking channel send
cm.mu.Unlock()

// Send configuration update (will block until cluster is ready)
cm.cluster.GetConfigSiphon() <- map[string]*httpserver.Config{
"main": config,
}
cm.currentPort = newPort

if newPort != oldPort {
cm.logger.Info("Configuration updated", "old_port", oldPort, "new_port", newPort)
}
Expand Down
4 changes: 3 additions & 1 deletion runnables/composite/reload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1315,7 +1315,9 @@ func TestReloadMembershipChanged(t *testing.T) {
runner.Reload()

// Wait for reload to complete
time.Sleep(50 * time.Millisecond)
require.Eventually(t, func() bool {
return runner.GetState() == finitestate.StatusRunning
}, 1*time.Second, 10*time.Millisecond, "Runner should transition to Running state")

// Verify the config was updated with the new runnable
updatedConfig := runner.getConfig()
Expand Down