Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,7 @@ github.com/kataras/pio v0.0.11 h1:kqreJ5KOEXGMwHAWHDwIl+mjfNCPhAwZPa8gK7MKlyw=
github.com/kataras/sitemap v0.0.6 h1:w71CRMMKYMJh6LR2wTgnk5hSgjVNB9KL60n5e2KHvLY=
github.com/kataras/tunnel v0.0.4 h1:sCAqWuJV7nPzGrlb0os3j49lk2JhILT0rID38NHNLpA=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d h1:Z+RDyXzjKE0i2sTjZ/b1uxiGtPhFy34Ou/Tk0qwN0kM=
github.com/kilic/bls12-381 v0.1.0 h1:encrdjqKMEvabVQ7qYOKu1OvhqpK4s47wDYtNiPtlp4=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 h1:FOOIBWrEkLgmlgGfMuZT83xIwfPDxEI2OHu6xUmJMFE=
github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4=
Expand Down
74 changes: 48 additions & 26 deletions sei-tendermint/internal/blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,40 +304,62 @@
return
}

r.logger.Info("checking if node is behind threshold, auto restarting if its behind", "threshold", r.blocksBehindThreshold, "interval", r.blocksBehindCheckInterval)
for {
select {
case <-time.After(r.blocksBehindCheckInterval):
selfHeight := r.store.Height()
maxPeerHeight := r.pool.MaxPeerHeight()
threshold := int64(r.blocksBehindThreshold) //nolint:gosec // validated in config.ValidateBasic against MaxInt64
behindHeight := maxPeerHeight - selfHeight
blockSyncIsSet := r.blockSync.IsSet()
if maxPeerHeight > r.previousMaxPeerHeight {
r.previousMaxPeerHeight = maxPeerHeight
}
r.logger.Info("checking if node is behind threshold, auto restarting if its behind", "threshold", r.blocksBehindThreshold, "interval", r.blocksBehindCheckInterval)
r.checkBehindAndSignalRestart()
case <-ctx.Done():
return
}
}
}

// We do not restart if we are not lagging behind, or we are already in block sync mode
if maxPeerHeight == 0 || behindHeight < threshold || blockSyncIsSet {
r.logger.Debug("does not exceed threshold or is already in block sync mode", "threshold", threshold, "behindHeight", behindHeight, "maxPeerHeight", maxPeerHeight, "selfHeight", selfHeight, "blockSyncIsSet", blockSyncIsSet)
continue
}
// checkBehindAndSignalRestart checks whether the node is behind the max peer
// height by the configured threshold, and if so, attempts to send a restart
// signal. Returns true if a restart signal was attempted (sent or dropped due
// to a full channel).
func (r *Reactor) checkBehindAndSignalRestart() bool {
selfHeight := r.store.Height()
maxPeerHeight := r.pool.MaxPeerHeight()
threshold := int64(r.blocksBehindThreshold) //nolint:gosec // validated in config.ValidateBasic against MaxInt64
behindHeight := maxPeerHeight - selfHeight
blockSyncIsSet := r.blockSync.IsSet()
if maxPeerHeight > r.previousMaxPeerHeight {
r.previousMaxPeerHeight = maxPeerHeight
}

// Check if we have met cooldown time
if time.Since(r.lastRestartTime).Seconds() < float64(r.restartCooldownSeconds) {
r.logger.Debug("we are lagging behind, going to trigger a restart after cooldown time passes")
continue
}
// We do not restart if we are not lagging behind, or we are already in block sync mode
if maxPeerHeight == 0 || behindHeight < threshold || blockSyncIsSet {
r.logger.Debug("does not exceed threshold or is already in block sync mode", "threshold", threshold, "behindHeight", behindHeight, "maxPeerHeight", maxPeerHeight, "selfHeight", selfHeight, "blockSyncIsSet", blockSyncIsSet)
return false
}

r.logger.Info("Blocks behind threshold, restarting node", "threshold", threshold, "behindHeight", behindHeight, "maxPeerHeight", maxPeerHeight, "selfHeight", selfHeight)
// Check if we have met cooldown time
if time.Since(r.lastRestartTime).Seconds() < float64(r.restartCooldownSeconds) {
r.logger.Debug("we are lagging behind, going to trigger a restart after cooldown time passes")
return false
}

// Send signal to restart the node
r.blockSync.Set()
r.restartCh <- struct{}{}
case <-ctx.Done():
return
}
r.logger.Info("Node is behind lagging threshold, restarting node", "threshold", threshold, "behindHeight", behindHeight, "maxPeerHeight", maxPeerHeight, "selfHeight", selfHeight)

// Reset cooldown timer before sending to prevent rapid-fire signals
// regardless of whether the app-level restart accepts or rejects this signal
r.lastRestartTime = time.Now()

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism

// Non-blocking send to avoid goroutine getting stuck if no reader
// or if the app-level cooldown silently drops the signal.
// Note: we do NOT set blockSync here — blockSync is only managed by
// the block sync lifecycle (poolRoutine sets/unsets it). Setting it here
// would permanently disable self-remediation if the app-level cooldown
// in WaitForQuitSignals silently drops the restart signal.
select {
case r.restartCh <- struct{}{}:
r.logger.Info("Restart signal sent successfully")
default:
r.logger.Error("Failed to send restart signal, will retry after cooldown")
}
return true
}

// processPeerUpdate processes a PeerUpdate.
Expand Down
184 changes: 130 additions & 54 deletions sei-tendermint/internal/blocksync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,57 +310,48 @@ func (m *MockBlockStore) Height() int64 {
return args.Get(0).(int64)
}

func TestAutoRestartIfBehind(t *testing.T) {
func TestCheckBehindAndSignalRestart(t *testing.T) {
t.Parallel()
tests := []struct {
name string
blocksBehindThreshold uint64
blocksBehindCheckInterval time.Duration
selfHeight int64
maxPeerHeight int64
isBlockSync bool
restartExpected bool
name string
threshold uint64
selfHeight int64
maxPeerHeight int64
isBlockSync bool
restartExpected bool
}{
{
name: "Should not restart if blocksBehindThreshold is 0",
blocksBehindThreshold: 0,
blocksBehindCheckInterval: 10 * time.Millisecond,
selfHeight: 100,
maxPeerHeight: 200,
isBlockSync: false,
restartExpected: false,
name: "Should not signal if behindHeight is less than threshold",
threshold: 50,
selfHeight: 100,
maxPeerHeight: 140,
restartExpected: false,
},
{
name: "Should not restart if behindHeight is less than threshold",
blocksBehindThreshold: 50,
selfHeight: 100,
blocksBehindCheckInterval: 10 * time.Millisecond,
maxPeerHeight: 140,
isBlockSync: false,
restartExpected: false,
name: "Should signal if behindHeight meets threshold",
threshold: 50,
selfHeight: 100,
maxPeerHeight: 160,
restartExpected: true,
},
{
name: "Should restart if behindHeight is greater than or equal to threshold",
blocksBehindThreshold: 50,
selfHeight: 100,
blocksBehindCheckInterval: 10 * time.Millisecond,
maxPeerHeight: 160,
isBlockSync: false,
restartExpected: true,
name: "Should not signal if maxPeerHeight is 0",
threshold: 50,
selfHeight: 100,
maxPeerHeight: 0,
restartExpected: false,
},
{
name: "Should not restart if blocksync",
blocksBehindThreshold: 50,
selfHeight: 100,
blocksBehindCheckInterval: 10 * time.Millisecond,
maxPeerHeight: 160,
isBlockSync: true,
restartExpected: false,
name: "Should not signal if already in block sync mode",
threshold: 50,
selfHeight: 100,
maxPeerHeight: 160,
isBlockSync: true,
restartExpected: false,
},
}

for _, tt := range tests {
t.Log(tt.name)
t.Run(tt.name, func(t *testing.T) {
mockBlockStore := new(MockBlockStore)
mockBlockStore.On("Height").Return(tt.selfHeight)
Expand All @@ -373,26 +364,111 @@ func TestAutoRestartIfBehind(t *testing.T) {

restartChan := make(chan struct{}, 1)
r := &Reactor{
logger: log.TestingLogger(),
store: mockBlockStore,
pool: blockPool,
blocksBehindThreshold: tt.blocksBehindThreshold,
blocksBehindCheckInterval: tt.blocksBehindCheckInterval,
restartCh: restartChan,
blockSync: newAtomicBool(tt.isBlockSync),
logger: log.TestingLogger(),
store: mockBlockStore,
pool: blockPool,
blocksBehindThreshold: tt.threshold,
restartCh: restartChan,
blockSync: newAtomicBool(tt.isBlockSync),
}

ctx, cancel := context.WithTimeout(t.Context(), 200*time.Millisecond)
defer cancel()
signaled := r.checkBehindAndSignalRestart()
assert.Equal(t, tt.restartExpected, signaled)
assert.Equal(t, tt.restartExpected, len(restartChan) == 1)
})
}
}

// TestAutoRestartShouldNotGetStuck verifies that the self-remediation loop
// does not permanently disable itself. This is a fully deterministic test
// with no goroutines or timers — it calls checkBehindAndSignalRestart directly.
//
// Previously, blockSync.Set() was called before sending the restart signal.
// If the app-level cooldown in WaitForQuitSignals silently dropped that signal,
// the blockSync flag remained true forever, preventing any future restart
// attempts. With the fix:
// - blockSync is NOT set by the check (it is read-only)
// - The send to restartCh is non-blocking so the goroutine never gets stuck
// - After a dropped signal, the next check still fires
// - On restart, node.go reconstructs the reactor with blockSync=true (for
// non-validator nodes), so it enters block sync mode to catch up
func TestAutoRestartShouldNotGetStuck(t *testing.T) {
mockBlockStore := new(MockBlockStore)
mockBlockStore.On("Height").Return(int64(100))

blockPool := &BlockPool{
logger: log.TestingLogger(),
height: 100,
maxPeerHeight: 200,
}

go r.autoRestartIfBehind(ctx)
cooldownSeconds := uint64(300) // 5 minute cooldown — large enough to never expire during the test
restartChan := make(chan struct{}, 1)
r := &Reactor{
logger: log.TestingLogger(),
store: mockBlockStore,
pool: blockPool,
blocksBehindThreshold: 50,
restartCooldownSeconds: cooldownSeconds,
lastRestartTime: time.Time{}, // zero value — cooldown is already expired
restartCh: restartChan,
blockSync: newAtomicBool(false),
}

select {
case <-restartChan:
assert.True(t, tt.restartExpected, "Unexpected restart")
case <-time.After(50 * time.Millisecond):
assert.False(t, tt.restartExpected, "Expected restart but did not occur")
}
})
// Step 1: First check — cooldown has expired (lastRestartTime is zero),
// node is behind threshold, signal is sent.
signaled := r.checkBehindAndSignalRestart()
assert.True(t, signaled, "Expected restart signal on first check")
assert.Equal(t, 1, len(restartChan), "Signal should be in the channel")

// Step 2: blockSync must NOT have been set — this was the root cause of the old bug.
assert.False(t, r.blockSync.IsSet(),
"checkBehindAndSignalRestart must not set blockSync; doing so would permanently disable retries")

// Step 3: Cooldown is now active (lastRestartTime was set to time.Now() by
// the check). Subsequent checks should be blocked by cooldown.
<-restartChan // consume the signal — simulating app-level cooldown dropping it
signaled = r.checkBehindAndSignalRestart()
assert.False(t, signaled, "Should not signal while reactor-level cooldown is active")
assert.Equal(t, 0, len(restartChan), "No signal should be sent during cooldown")

// Step 4: Simulate cooldown expiring by backdating lastRestartTime.
r.lastRestartTime = time.Now().Add(-time.Duration(cooldownSeconds+1) * time.Second)

// Step 5: Now the check fires again — because blockSync was NOT set and
// cooldown has expired, self-remediation is not stuck.
signaled = r.checkBehindAndSignalRestart()
assert.True(t, signaled, "Expected retry signal after cooldown expired — self-remediation must not be stuck")
assert.Equal(t, 1, len(restartChan), "Retry signal should be in the channel")
assert.False(t, r.blockSync.IsSet(), "blockSync should remain false after retry")

// Step 6: Non-blocking send — if the channel is already full, the check
// should not block and should still return true.
r.lastRestartTime = time.Time{} // reset cooldown so the check reaches the send
signaled = r.checkBehindAndSignalRestart()
assert.True(t, signaled, "Should still attempt signal even when channel is full")
assert.Equal(t, 1, len(restartChan), "Channel should still have exactly 1 signal (non-blocking)")

// Step 7: Simulate the in-process restart. When the node rebuilds, node.go
// determines blockSync = !onlyValidatorIsUs(state, pubKey) which is true for
// non-validator / RPC nodes, and passes it to NewReactor. Verify that the
// newly constructed reactor starts with blockSync=true so it enters block
// sync mode to catch up.
restartedReactor := &Reactor{
logger: log.TestingLogger(),
store: mockBlockStore,
pool: blockPool,
blocksBehindThreshold: 50,
restartCooldownSeconds: cooldownSeconds,
restartCh: make(chan struct{}, 1),
blockSync: newAtomicBool(true), // node.go sets this true for non-validator
}
assert.True(t, restartedReactor.blockSync.IsSet(),
"After restart, reactor must be in block sync mode so the node catches up")

// Step 8: While in block sync mode, the check should NOT signal — we don't
// want self-remediation firing while block sync is actively catching up.
signaled = restartedReactor.checkBehindAndSignalRestart()
assert.False(t, signaled, "Should not signal while in block sync mode")
assert.Equal(t, 0, len(restartedReactor.restartCh), "No signal should be sent during block sync")
}
Loading