Skip to content

Commit

Permalink
feat: sequencer auto recover when meet an unexpected shutdown (#166)
Browse files Browse the repository at this point in the history
Co-authored-by: Owen <103096885+owen-reorg@users.noreply.github.com>
  • Loading branch information
krish-nr and owen-reorg authored Nov 13, 2024
1 parent 1770748 commit 215dee2
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 15 deletions.
84 changes: 74 additions & 10 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,16 +242,17 @@ type BlockChain struct {
chainConfig *params.ChainConfig // Chain & network configuration
cacheConfig *CacheConfig // Cache configuration for pruning

db ethdb.Database // Low level persistent database to store final content in
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
lastWrite uint64 // Last block when the state was flushed
flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state
triedb *triedb.Database // The database handler for maintaining trie nodes.
stateCache state.Database // State database to reuse between imports (contains state cache)
proofKeeper *ProofKeeper // Store/Query op-proposal proof to ensure consistent.
txIndexer *txIndexer // Transaction indexer, might be nil if not enabled
db ethdb.Database // Low level persistent database to store final content in
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
lastWrite uint64 // Last block when the state was flushed
flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state
triedb *triedb.Database // The database handler for maintaining trie nodes.
stateCache state.Database // State database to reuse between imports (contains state cache)
proofKeeper *ProofKeeper // Store/Query op-proposal proof to ensure consistent.
txIndexer *txIndexer // Transaction indexer, might be nil if not enabled
stateRecoveringStatus atomic.Bool

hc *HeaderChain
rmLogsFeed event.Feed
Expand Down Expand Up @@ -2223,11 +2224,25 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
return 0, nil
}

func (bc *BlockChain) RecoverStateAndSetHead(block *types.Block) (common.Hash, error) {
return bc.recoverStateAndSetHead(block)
}

// recoverAncestors finds the closest ancestor with available state and re-execute
// all the ancestor blocks since that.
// recoverAncestors is only used post-merge.
// We return the hash of the latest block that we could correctly validate.
func (bc *BlockChain) recoverAncestors(block *types.Block) (common.Hash, error) {
if bc.stateRecoveringStatus.Load() {
log.Warn("recover is already in progress, skipping", "block", block.Hash())
return common.Hash{}, errors.New("state recover in progress")
}

bc.stateRecoveringStatus.Store(true)
defer func() {
bc.stateRecoveringStatus.Store(false)
}()

// Gather all the sidechain hashes (full blocks may be memory heavy)
var (
hashes []common.Hash
Expand Down Expand Up @@ -2644,6 +2659,55 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header) (int, error) {
return 0, err
}

// recoverStateAndSetHead attempts to recover the state of the blockchain by re-importing
// missing blocks and advancing the chain head. It ensures the state is available
// for the given block and its ancestors before updating the head.
func (bc *BlockChain) recoverStateAndSetHead(block *types.Block) (common.Hash, error) {
var (
hashes []common.Hash
numbers []uint64
parent = block
)
for parent != nil && !bc.HasState(parent.Root()) {
if bc.stateRecoverable(parent.Root()) {
if err := bc.triedb.Recover(parent.Root()); err != nil {
return common.Hash{}, err
}
break
}
hashes = append(hashes, parent.Hash())
numbers = append(numbers, parent.NumberU64())
parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1)

// If the chain is terminating, stop iteration
if bc.insertStopped() {
log.Debug("Abort during blocks iteration")
return common.Hash{}, errInsertionInterrupted
}
}
if parent == nil {
return common.Hash{}, errors.New("missing parent")
}
// Import all the pruned blocks to make the state available
for i := len(hashes) - 1; i >= 0; i-- {
// If the chain is terminating, stop processing blocks
if bc.insertStopped() {
log.Debug("Abort during blocks processing")
return common.Hash{}, errInsertionInterrupted
}
var b *types.Block
if i == 0 {
b = block
} else {
b = bc.GetBlock(hashes[i], numbers[i])
}
if _, err := bc.insertChain(types.Blocks{b}, true); err != nil {
return b.ParentHash(), err
}
}
return block.Hash(), nil
}

// SetBlockValidatorAndProcessorForTesting sets the current validator and processor.
// This method can be used to force an invalid blockchain to be verified for tests.
// This method is unsafe and should only be used before block import starts.
Expand Down
2 changes: 1 addition & 1 deletion eth/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) {
if !cs.handler.chain.NoTries() && !cs.handler.chain.HasState(head.Root) {
block := cs.handler.chain.CurrentSnapBlock()
td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64())
log.Info("Reenabled snap sync as chain is stateless")
log.Info("Reenabled snap sync as chain is stateless", "lost block", block.Number.Uint64())
return downloader.SnapSync, td
}
// Nope, we're really full syncing
Expand Down
56 changes: 56 additions & 0 deletions miner/fix_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package miner

import (
"fmt"
"sync"

"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)

// StateFixManager manages the fix operation state and notification mechanism.
type StateFixManager struct {
mutex sync.Mutex // Protects access to fix state
}

// NewFixManager initializes a FixManager with required dependencies
func NewFixManager() *StateFixManager {
return &StateFixManager{}
}

// StartFix launches a goroutine to manage the fix process and tracks the fix state.
func (fm *StateFixManager) StartFix(worker *worker, id engine.PayloadID, parentHash common.Hash) error {
fm.mutex.Lock()
defer fm.mutex.Unlock()

log.Info("Fix is in progress for the block", "id", id)

err := worker.fix(parentHash)
if err != nil {
log.Error("Fix process failed", "error", err)
return err
}

log.Info("Fix process completed successfully", "id", id)
return nil
}

// RecoverFromLocal attempts to recover the block and MPT data from the local chain.
//
// blockHash: The latest header(unsafe block) hash of the block to recover.
func (fm *StateFixManager) RecoverFromLocal(w *worker, blockHash common.Hash) error {
block := w.chain.GetBlockByHash(blockHash)
if block == nil {
return fmt.Errorf("block not found in local chain")
}

log.Info("Fixing data for block", "block number", block.NumberU64())
latestValid, err := w.chain.RecoverStateAndSetHead(block)
if err != nil {
return fmt.Errorf("failed to recover state: %v", err)
}

log.Info("Recovered states up to block", "latestValid", latestValid)
return nil
}
5 changes: 3 additions & 2 deletions miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/consensus/misc/eip1559"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/consensus/misc/eip1559"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus"
Expand Down
74 changes: 74 additions & 0 deletions miner/payload_building.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,22 @@
package miner

import (
"context"
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
"math/big"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/tracers"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
Expand Down Expand Up @@ -263,6 +268,23 @@ func (payload *Payload) stopBuilding() {
})
}

// fix attempts to recover and repair the block and its associated data (such as MPT)
// from the local blockchain
// blockHash: The hash of the latest block that needs to be recovered and fixed.
func (w *worker) fix(blockHash common.Hash) error {
log.Info("Fix operation started")

// Try to recover from local data
err := w.stateFixManager.RecoverFromLocal(w, blockHash)
if err != nil {
log.Error("Failed to recover from local data", "err", err)
return err
}

log.Info("Fix operation completed successfully")
return nil
}

// buildPayload builds the payload according to the provided parameters.
func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) {
if args.NoTxPool { // don't start the background payload updating job if there is no tx pool to pull from
Expand Down Expand Up @@ -318,6 +340,18 @@ func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) {
return nil, err
}

//check state of parent block
_, err = w.retrieveParentState(fullParams)
if err != nil && strings.Contains(err.Error(), "missing trie node") {
log.Error("missing parent state when building block, try to fix...")
// fix state data
fixErr := w.StartStateFix(args.Id(), fullParams.parentHash)
if fixErr != nil {
log.Error("fix failed", "err", fixErr)
}
return nil, err
}

payload := newPayload(nil, args.Id())
// set shared interrupt
fullParams.interrupt = payload.interrupt
Expand Down Expand Up @@ -430,3 +464,43 @@ func (w *worker) cacheMiningBlock(block *types.Block, env *environment) {
log.Info("Successfully cached sealed new block", "number", block.Number(), "root", block.Root(), "hash", hash,
"elapsed", common.PrettyDuration(time.Since(start)))
}

func (w *worker) retrieveParentState(genParams *generateParams) (state *state.StateDB, err error) {
w.mu.RLock()
defer w.mu.RUnlock()

log.Info("retrieveParentState validate")
// Find the parent block for sealing task
parent := w.chain.CurrentBlock()
if genParams.parentHash != (common.Hash{}) {
block := w.chain.GetBlockByHash(genParams.parentHash)
if block == nil {
return nil, fmt.Errorf("missing parent")
}
parent = block.Header()
}

state, err = w.chain.StateAt(parent.Root)

// If there is an error and Optimism is enabled in the chainConfig, allow reorg
if err != nil && w.chainConfig.Optimism != nil {
if historicalBackend, ok := w.eth.(BackendWithHistoricalState); ok {
// Attempt to retrieve the historical state
var release tracers.StateReleaseFunc
parentBlock := w.eth.BlockChain().GetBlockByHash(parent.Hash())
state, release, err = historicalBackend.StateAtBlock(
context.Background(), parentBlock, ^uint64(0), nil, false, false,
)

// Copy the state and release the resources
state = state.Copy()
release()
}
}

// Return the state and any error encountered
if err != nil {
return nil, err
}
return state, nil
}
12 changes: 11 additions & 1 deletion miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (

mapset "github.com/deckarep/golang-set/v2"

"github.com/holiman/uint256"

"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/misc"
Expand All @@ -43,7 +46,6 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
"github.com/holiman/uint256"
)

const (
Expand Down Expand Up @@ -268,6 +270,13 @@ type worker struct {

// MEV
bundleCache *BundleCache

// FixManager
stateFixManager *StateFixManager
}

func (w *worker) StartStateFix(id engine.PayloadID, parentHash common.Hash) error {
return w.stateFixManager.StartFix(w, id, parentHash)
}

func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(header *types.Header) bool, init bool) *worker {
Expand All @@ -294,6 +303,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
resubmitIntervalCh: make(chan time.Duration),
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
bundleCache: NewBundleCache(),
stateFixManager: NewFixManager(),
}
// Subscribe for transaction insertion events (whether from network or resurrects)
worker.txsSub = eth.TxPool().SubscribeTransactions(worker.txsCh, true)
Expand Down
3 changes: 2 additions & 1 deletion miner/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

"github.com/holiman/uint256"

"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
Expand All @@ -37,7 +39,6 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
"github.com/holiman/uint256"
)

const (
Expand Down

0 comments on commit 215dee2

Please sign in to comment.