Skip to content

Commit 73297c1

Browse files
authored
fix: inconsistent state detection and rollback (#2983)
* Fix inconsistent state detection and rollback * Review feedback * Review feedback * Review feedback
1 parent 43c729b commit 73297c1

File tree

3 files changed

+169
-19
lines changed

3 files changed

+169
-19
lines changed

apps/evm/cmd/rollback.go

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
package cmd
22

33
import (
4+
"bytes"
45
"context"
56
"errors"
67
"fmt"
8+
"os"
79

10+
"github.com/ethereum/go-ethereum/common"
11+
ds "github.com/ipfs/go-datastore"
812
"github.com/spf13/cobra"
913

1014
goheaderstore "github.com/celestiaorg/go-header/store"
15+
"github.com/evstack/ev-node/execution/evm"
1116
rollcmd "github.com/evstack/ev-node/pkg/cmd"
1217
"github.com/evstack/ev-node/pkg/store"
1318
"github.com/evstack/ev-node/types"
@@ -42,7 +47,7 @@ func NewRollbackCmd() *cobra.Command {
4247

4348
defer func() {
4449
if closeErr := rawEvolveDB.Close(); closeErr != nil {
45-
fmt.Printf("Warning: failed to close evolve database: %v\n", closeErr)
50+
cmd.Printf("Warning: failed to close evolve database: %v\n", closeErr)
4651
}
4752
}()
4853

@@ -63,6 +68,17 @@ func NewRollbackCmd() *cobra.Command {
6368
return fmt.Errorf("failed to rollback ev-node state: %w", err)
6469
}
6570

71+
// rollback execution layer via EngineClient
72+
engineClient, err := createRollbackEngineClient(cmd, rawEvolveDB)
73+
if err != nil {
74+
cmd.Printf("Warning: failed to create engine client, skipping EL rollback: %v\n", err)
75+
} else {
76+
if err := engineClient.Rollback(goCtx, height); err != nil {
77+
return fmt.Errorf("failed to rollback execution layer: %w", err)
78+
}
79+
cmd.Printf("Rolled back execution layer to height %d\n", height)
80+
}
81+
6682
// rollback ev-node goheader state
6783
headerStore, err := goheaderstore.NewStore[*types.SignedHeader](
6884
evolveDB,
@@ -101,7 +117,7 @@ func NewRollbackCmd() *cobra.Command {
101117
errs = errors.Join(errs, fmt.Errorf("failed to rollback data sync service state: %w", err))
102118
}
103119

104-
fmt.Printf("Rolled back ev-node state to height %d\n", height)
120+
cmd.Printf("Rolled back ev-node state to height %d\n", height)
105121
if syncNode {
106122
fmt.Println("Restart the node with the `--evnode.clear_cache` flag")
107123
}
@@ -113,5 +129,42 @@ func NewRollbackCmd() *cobra.Command {
113129
cmd.Flags().Uint64Var(&height, "height", 0, "rollback to a specific height")
114130
cmd.Flags().BoolVar(&syncNode, "sync-node", false, "sync node (no aggregator)")
115131

132+
// EVM flags for execution layer rollback
133+
cmd.Flags().String(evm.FlagEvmEthURL, "http://localhost:8545", "URL of the Ethereum JSON-RPC endpoint")
134+
cmd.Flags().String(evm.FlagEvmEngineURL, "http://localhost:8551", "URL of the Engine API endpoint")
135+
cmd.Flags().String(evm.FlagEvmJWTSecretFile, "", "Path to file containing the JWT secret for authentication")
136+
116137
return cmd
117138
}
139+
140+
func createRollbackEngineClient(cmd *cobra.Command, db ds.Batching) (*evm.EngineClient, error) {
141+
ethURL, err := cmd.Flags().GetString(evm.FlagEvmEthURL)
142+
if err != nil {
143+
return nil, fmt.Errorf("failed to get '%s' flag: %w", evm.FlagEvmEthURL, err)
144+
}
145+
engineURL, err := cmd.Flags().GetString(evm.FlagEvmEngineURL)
146+
if err != nil {
147+
return nil, fmt.Errorf("failed to get '%s' flag: %w", evm.FlagEvmEngineURL, err)
148+
}
149+
150+
jwtSecretFile, err := cmd.Flags().GetString(evm.FlagEvmJWTSecretFile)
151+
if err != nil {
152+
return nil, fmt.Errorf("failed to get '%s' flag: %w", evm.FlagEvmJWTSecretFile, err)
153+
}
154+
155+
if jwtSecretFile == "" {
156+
return nil, fmt.Errorf("JWT secret file must be provided via --evm.jwt-secret-file for EL rollback")
157+
}
158+
159+
secretBytes, err := os.ReadFile(jwtSecretFile)
160+
if err != nil {
161+
return nil, fmt.Errorf("failed to read JWT secret from file '%s': %w", jwtSecretFile, err)
162+
}
163+
jwtSecret := string(bytes.TrimSpace(secretBytes))
164+
165+
if jwtSecret == "" {
166+
return nil, fmt.Errorf("JWT secret file '%s' is empty", jwtSecretFile)
167+
}
168+
169+
return evm.NewEngineExecutionClient(ethURL, engineURL, jwtSecret, common.Hash{}, common.Address{}, db, false)
170+
}

core/execution/execution.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,15 @@ type HeightProvider interface {
102102
// - error: Any errors during height retrieval
103103
GetLatestHeight(ctx context.Context) (uint64, error)
104104
}
105+
106+
// Rollbackable is an optional interface that execution clients can implement
107+
// to support automatic rollback when the execution layer is ahead of the target height.
108+
// This enables automatic recovery during rolling restarts when the EL has committed
109+
// blocks that were not replicated to the consensus layer.
110+
//
111+
// Requirements:
112+
// - Only execution layers supporting in-flight rollback should implement this.
113+
type Rollbackable interface {
114+
// Rollback resets the execution layer head to the specified height.
115+
Rollback(ctx context.Context, targetHeight uint64) error
116+
}

execution/evm/execution.go

Lines changed: 102 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ var (
5858
// Ensure EngineAPIExecutionClient implements the execution.Execute interface
5959
var _ execution.Executor = (*EngineClient)(nil)
6060

61+
// Ensure EngineClient implements the execution.HeightProvider interface
62+
var _ execution.HeightProvider = (*EngineClient)(nil)
63+
64+
// Ensure EngineClient implements the execution.Rollbackable interface
65+
var _ execution.Rollbackable = (*EngineClient)(nil)
66+
6167
// validatePayloadStatus checks the payload status and returns appropriate errors.
6268
// It implements the Engine API specification's status handling:
6369
// - VALID: Operation succeeded, return nil
@@ -338,7 +344,7 @@ func (c *EngineClient) GetTxs(ctx context.Context) ([][]byte, error) {
338344
func (c *EngineClient) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) (updatedStateRoot []byte, maxBytes uint64, err error) {
339345

340346
// 1. Check for idempotent execution
341-
stateRoot, payloadID, found, err := c.checkIdempotency(ctx, blockHeight, timestamp, txs)
347+
stateRoot, payloadID, found, err := c.reconcileExecutionAtHeight(ctx, blockHeight, timestamp, txs)
342348
if err != nil {
343349
c.logger.Warn().Err(err).Uint64("height", blockHeight).Msg("ExecuteTxs: idempotency check failed")
344350
// Continue execution on error, as it might be transient
@@ -548,6 +554,7 @@ func (c *EngineClient) setFinalWithHeight(ctx context.Context, blockHash common.
548554

549555
// doForkchoiceUpdate performs the actual forkchoice update RPC call with retry logic.
550556
func (c *EngineClient) doForkchoiceUpdate(ctx context.Context, args engine.ForkchoiceStateV1, operation string) error {
557+
551558
// Call forkchoice update with retry logic for SYNCING status
552559
err := retryWithBackoffOnPayloadStatus(ctx, func() error {
553560
forkchoiceResult, err := c.engineClient.ForkchoiceUpdated(ctx, args, nil)
@@ -690,35 +697,57 @@ func (c *EngineClient) ResumePayload(ctx context.Context, payloadIDBytes []byte)
690697
return stateRoot, err
691698
}
692699

693-
// checkIdempotency checks if the block at the given height and timestamp has already been executed.
700+
// reconcileExecutionAtHeight checks if the block at the given height and timestamp has already been executed.
694701
// It returns:
695702
// - stateRoot: non-nil if block is already promoted/finalized (idempotent success)
696703
// - payloadID: non-nil if block execution was started but not finished (resume needed)
697704
// - found: true if either of the above is true
698705
// - err: error during checks
699-
func (c *EngineClient) checkIdempotency(ctx context.Context, height uint64, timestamp time.Time, txs [][]byte) (stateRoot []byte, payloadID *engine.PayloadID, found bool, err error) {
706+
func (c *EngineClient) reconcileExecutionAtHeight(ctx context.Context, height uint64, timestamp time.Time, txs [][]byte) (stateRoot []byte, payloadID *engine.PayloadID, found bool, err error) {
700707
// 1. Check ExecMeta from store
701708
execMeta, err := c.store.GetExecMeta(ctx, height)
702709
if err == nil && execMeta != nil {
703-
// If we already have a promoted block at this height, return the stored StateRoot
710+
// If we already have a promoted block at this height, verify timestamp matches
711+
// to catch Dual-Store Conflicts where ExecMeta was saved for an old block
712+
// that was later replaced via consensus.
704713
if execMeta.Stage == ExecStagePromoted && len(execMeta.StateRoot) > 0 {
705-
c.logger.Info().
714+
if execMeta.Timestamp == timestamp.Unix() {
715+
c.logger.Info().
716+
Uint64("height", height).
717+
Str("stage", execMeta.Stage).
718+
Msg("ExecuteTxs: reusing already-promoted execution (idempotent)")
719+
return execMeta.StateRoot, nil, true, nil
720+
}
721+
// Timestamp mismatch - ExecMeta is stale from an old block that was replaced.
722+
// Ignore it and proceed to EL check which will handle rollback if needed.
723+
c.logger.Warn().
706724
Uint64("height", height).
707-
Str("stage", execMeta.Stage).
708-
Msg("ExecuteTxs: reusing already-promoted execution (idempotent)")
709-
return execMeta.StateRoot, nil, true, nil
725+
Int64("execmeta_timestamp", execMeta.Timestamp).
726+
Int64("requested_timestamp", timestamp.Unix()).
727+
Msg("ExecuteTxs: ExecMeta timestamp mismatch, ignoring stale promoted record")
710728
}
711729

712-
// If we have a started execution with a payloadID, return it to resume
730+
// If we have a started execution with a payloadID, validate it still exists before resuming.
731+
// After node restart, the EL's payload cache is ephemeral and the payloadID may be stale.
713732
if execMeta.Stage == ExecStageStarted && len(execMeta.PayloadID) == 8 {
714-
c.logger.Info().
715-
Uint64("height", height).
716-
Str("stage", execMeta.Stage).
717-
Msg("ExecuteTxs: found in-progress execution with payloadID, returning payloadID for resume")
718-
719733
var pid engine.PayloadID
720734
copy(pid[:], execMeta.PayloadID)
721-
return nil, &pid, true, nil
735+
736+
// Validate payload still exists by attempting to retrieve it
737+
if _, err = c.engineClient.GetPayload(ctx, pid); err == nil {
738+
c.logger.Info().
739+
Uint64("height", height).
740+
Str("stage", execMeta.Stage).
741+
Msg("ExecuteTxs: found in-progress execution with payloadID, returning payloadID for resume")
742+
return nil, &pid, true, nil
743+
}
744+
// Payload is stale (expired or node restarted) - proceed with fresh execution
745+
c.logger.Warn().
746+
Uint64("height", height).
747+
Str("payloadID", pid.String()).
748+
Err(err).
749+
Msg("ExecuteTxs: stale ExecMeta payloadID no longer valid in EL, will re-execute")
750+
// Don't return - fall through to fresh execution
722751
}
723752
}
724753

@@ -744,12 +773,27 @@ func (c *EngineClient) checkIdempotency(ctx context.Context, height uint64, time
744773

745774
return existingStateRoot.Bytes(), nil, true, nil
746775
}
747-
// Timestamp mismatch - log warning but proceed
776+
// We need to rollback the EL to height-1 so it can re-execute
748777
c.logger.Warn().
749778
Uint64("height", height).
750779
Uint64("existingTimestamp", existingTimestamp).
751780
Int64("requestedTimestamp", timestamp.Unix()).
752-
Msg("ExecuteTxs: block exists at height but timestamp differs")
781+
Msg("ExecuteTxs: block exists at height but timestamp differs - rolling back EL to re-sync")
782+
783+
// Rollback to height-1 to allow re-execution with correct timestamp
784+
if height > 0 {
785+
if err := c.Rollback(ctx, height-1); err != nil {
786+
c.logger.Error().Err(err).
787+
Uint64("height", height).
788+
Uint64("rollback_target", height-1).
789+
Msg("ExecuteTxs: failed to rollback EL for timestamp mismatch")
790+
return nil, nil, false, fmt.Errorf("failed to rollback EL for timestamp mismatch at height %d: %w", height, err)
791+
}
792+
c.logger.Info().
793+
Uint64("height", height).
794+
Uint64("rollback_target", height-1).
795+
Msg("ExecuteTxs: EL rolled back successfully, will re-execute with correct timestamp")
796+
}
753797
}
754798

755799
return nil, nil, false, nil
@@ -907,6 +951,47 @@ func (c *EngineClient) GetLatestHeight(ctx context.Context) (uint64, error) {
907951
return header.Number.Uint64(), nil
908952
}
909953

954+
// Rollback resets the execution layer head to the specified height using forkchoice update.
955+
// This is used for recovery when the EL is ahead of the consensus layer (e.g., during rolling restarts
956+
//
957+
// Implements the execution.Rollbackable interface.
958+
func (c *EngineClient) Rollback(ctx context.Context, targetHeight uint64) error {
959+
// Get block hash at target height
960+
blockHash, _, _, _, err := c.getBlockInfo(ctx, targetHeight)
961+
if err != nil {
962+
return fmt.Errorf("get block at height %d: %w", targetHeight, err)
963+
}
964+
965+
c.logger.Info().
966+
Uint64("target_height", targetHeight).
967+
Str("block_hash", blockHash.Hex()).
968+
Msg("rolling back execution layer via forkchoice update")
969+
970+
// Reset head, safe, and finalized to target block
971+
// This forces the EL to reorg its canonical chain to the target height
972+
c.mu.Lock()
973+
c.currentHeadBlockHash = blockHash
974+
c.currentHeadHeight = targetHeight
975+
c.currentSafeBlockHash = blockHash
976+
c.currentFinalizedBlockHash = blockHash
977+
args := engine.ForkchoiceStateV1{
978+
HeadBlockHash: blockHash,
979+
SafeBlockHash: blockHash,
980+
FinalizedBlockHash: blockHash,
981+
}
982+
c.mu.Unlock()
983+
984+
if err := c.doForkchoiceUpdate(ctx, args, "Rollback"); err != nil {
985+
return fmt.Errorf("forkchoice update for rollback failed: %w", err)
986+
}
987+
988+
c.logger.Info().
989+
Uint64("target_height", targetHeight).
990+
Msg("execution layer rollback completed")
991+
992+
return nil
993+
}
994+
910995
// decodeSecret decodes a hex-encoded JWT secret string into a byte slice.
911996
func decodeSecret(jwtSecret string) ([]byte, error) {
912997
secret, err := hex.DecodeString(strings.TrimPrefix(jwtSecret, "0x"))

0 commit comments

Comments
 (0)