Skip to content

core, eth, les, tests, trie: abstract node scheme #25532

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions cmd/geth/chaincmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/trie"
"github.com/urfave/cli/v2"
)

Expand All @@ -48,7 +49,7 @@ var (
Name: "init",
Usage: "Bootstrap and initialize a new genesis block",
ArgsUsage: "<genesisPath>",
Flags: utils.DatabasePathFlags,
Flags: flags.Merge([]cli.Flag{utils.CachePreimagesFlag}, utils.DatabasePathFlags),
Description: `
The init command initializes a new genesis block and definition for the network.
This is a destructive action and changes the network in which you will be
Expand Down Expand Up @@ -188,12 +189,16 @@ func initGenesis(ctx *cli.Context) error {
// Open and initialise both full and light databases
stack, _ := makeConfigNode(ctx)
defer stack.Close()

for _, name := range []string{"chaindata", "lightchaindata"} {
chaindb, err := stack.OpenDatabaseWithFreezer(name, 0, 0, ctx.String(utils.AncientFlag.Name), "", false)
if err != nil {
utils.Fatalf("Failed to open database: %v", err)
}
_, hash, err := core.SetupGenesisBlock(chaindb, genesis)
triedb := trie.NewDatabaseWithConfig(chaindb, &trie.Config{
Preimages: ctx.Bool(utils.CachePreimagesFlag.Name),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we enable preimage-recording by default? Or just based on the flags passed by users.

})
_, hash, err := core.SetupGenesisBlock(chaindb, triedb, genesis)
if err != nil {
utils.Fatalf("Failed to write genesis block: %v", err)
}
Expand Down Expand Up @@ -460,7 +465,10 @@ func dump(ctx *cli.Context) error {
if err != nil {
return err
}
state, err := state.New(root, state.NewDatabase(db), nil)
config := &trie.Config{
Preimages: true, // always enable preimage lookup
}
state, err := state.New(root, state.NewDatabaseWithConfig(db, config), nil)
if err != nil {
return err
}
Expand Down
68 changes: 34 additions & 34 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,12 @@ type BlockChain struct {
chainConfig *params.ChainConfig // Chain & network configuration
cacheConfig *CacheConfig // Cache configuration for pruning

db ethdb.Database // Low level persistent database to store final content in
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
db ethdb.Database // Low level persistent database to store final content in
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
triedb *trie.Database // The database handler for maintaining trie nodes.
stateCache state.Database // State database to reuse between imports (contains state cache)

// txLookupLimit is the maximum number of blocks from head whose tx indices
// are reserved:
Expand Down Expand Up @@ -200,7 +202,6 @@ type BlockChain struct {
currentFinalizedBlock atomic.Value // Current finalized head
currentSafeBlock atomic.Value // Current safe head

stateCache state.Database // State database to reuse between imports (contains state cache)
bodyCache *lru.Cache[common.Hash, *types.Body]
bodyRLPCache *lru.Cache[common.Hash, rlp.RawValue]
receiptsCache *lru.Cache[common.Hash, []*types.Receipt]
Expand Down Expand Up @@ -231,10 +232,16 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
cacheConfig = defaultCacheConfig
}

// Open trie database with provided config
triedb := trie.NewDatabaseWithConfig(db, &trie.Config{
Cache: cacheConfig.TrieCleanLimit,
Journal: cacheConfig.TrieCleanJournal,
Preimages: cacheConfig.Preimages,
})
// Setup the genesis block, commit the provided genesis specification
// to database if the genesis block is not present yet, or load the
// stored one from database.
chainConfig, genesisHash, genesisErr := SetupGenesisBlockWithOverride(db, genesis, overrides)
chainConfig, genesisHash, genesisErr := SetupGenesisBlockWithOverride(db, triedb, genesis, overrides)
if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
return nil, genesisErr
}
Expand All @@ -247,15 +254,11 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
log.Info("")

bc := &BlockChain{
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(nil),
stateCache: state.NewDatabaseWithConfig(db, &trie.Config{
Cache: cacheConfig.TrieCleanLimit,
Journal: cacheConfig.TrieCleanJournal,
Preimages: cacheConfig.Preimages,
}),
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triedb: triedb,
triegc: prque.New(nil),
quit: make(chan struct{}),
chainmu: syncx.NewClosableMutex(),
bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit),
Expand All @@ -268,6 +271,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
vmConfig: vmConfig,
}
bc.forker = NewForkChoice(bc, shouldPreserve)
bc.stateCache = state.NewDatabaseWithNodeDB(bc.db, bc.triedb)
bc.validator = NewBlockValidator(chainConfig, bc, engine)
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
bc.processor = NewStateProcessor(chainConfig, bc, engine)
Expand Down Expand Up @@ -300,7 +304,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
}
// Make sure the state associated with the block is available
head := bc.CurrentBlock()
if _, err := state.New(head.Root(), bc.stateCache, bc.snaps); err != nil {
if !bc.HasState(head.Root()) {
// Head state is missing, before the state recovery, find out the
// disk layer point of snapshot(if it's enabled). Make sure the
// rewound point is lower than disk layer.
Expand Down Expand Up @@ -388,7 +392,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
var recover bool

head := bc.CurrentBlock()
if layer := rawdb.ReadSnapshotRecoveryNumber(bc.db); layer != nil && *layer > head.NumberU64() {
if layer := rawdb.ReadSnapshotRecoveryNumber(bc.db); layer != nil && *layer >= head.NumberU64() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was a mistake before. If we rewind the chain state to disk layer, then in this case recovery mode should be enabled.

log.Warn("Enabling snapshot recovery", "chainhead", head.NumberU64(), "diskbase", *layer)
recover = true
}
Expand All @@ -398,7 +402,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
NoBuild: bc.cacheConfig.SnapshotNoBuild,
AsyncBuild: !bc.cacheConfig.SnapshotWait,
}
bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.stateCache.TrieDB(), head.Root())
bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.triedb, head.Root())
}

// Start future block processor.
Expand All @@ -411,11 +415,10 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
log.Warn("Sanitizing invalid trie cache journal time", "provided", bc.cacheConfig.TrieCleanRejournal, "updated", time.Minute)
bc.cacheConfig.TrieCleanRejournal = time.Minute
}
triedb := bc.stateCache.TrieDB()
bc.wg.Add(1)
go func() {
defer bc.wg.Done()
triedb.SaveCachePeriodically(bc.cacheConfig.TrieCleanJournal, bc.cacheConfig.TrieCleanRejournal, bc.quit)
bc.triedb.SaveCachePeriodically(bc.cacheConfig.TrieCleanJournal, bc.cacheConfig.TrieCleanRejournal, bc.quit)
}()
}
// Rewind the chain in case of an incompatible config upgrade.
Expand Down Expand Up @@ -594,7 +597,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo
if root != (common.Hash{}) && !beyondRoot && newHeadBlock.Root() == root {
beyondRoot, rootNumber = true, newHeadBlock.NumberU64()
}
if _, err := state.New(newHeadBlock.Root(), bc.stateCache, bc.snaps); err != nil {
if !bc.HasState(newHeadBlock.Root()) {
log.Trace("Block state missing, rewinding further", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
if pivot == nil || newHeadBlock.NumberU64() > *pivot {
parent := bc.GetBlock(newHeadBlock.ParentHash(), newHeadBlock.NumberU64()-1)
Expand All @@ -617,7 +620,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo
// if the historical chain pruning is enabled. In that case the logic
// needs to be improved here.
if !bc.HasState(bc.genesisBlock.Root()) {
if err := CommitGenesisState(bc.db, bc.genesisBlock.Hash()); err != nil {
if err := CommitGenesisState(bc.db, bc.triedb, bc.genesisBlock.Hash()); err != nil {
log.Crit("Failed to commit genesis state", "err", err)
}
log.Debug("Recommitted genesis state to disk")
Expand Down Expand Up @@ -900,7 +903,7 @@ func (bc *BlockChain) Stop() {
// - HEAD-1: So we don't do large reorgs if our HEAD becomes an uncle
// - HEAD-127: So we have a hard limit on the number of blocks reexecuted
if !bc.cacheConfig.TrieDirtyDisabled {
triedb := bc.stateCache.TrieDB()
triedb := bc.triedb

for _, offset := range []uint64{0, 1, TriesInMemory - 1} {
if number := bc.CurrentBlock().NumberU64(); number > offset {
Expand Down Expand Up @@ -932,8 +935,7 @@ func (bc *BlockChain) Stop() {
// Ensure all live cached entries be saved into disk, so that we can skip
// cache warmup when node restarts.
if bc.cacheConfig.TrieCleanJournal != "" {
triedb := bc.stateCache.TrieDB()
triedb.SaveCache(bc.cacheConfig.TrieCleanJournal)
bc.triedb.SaveCache(bc.cacheConfig.TrieCleanJournal)
}
log.Info("Blockchain stopped")
}
Expand Down Expand Up @@ -1306,24 +1308,22 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
if err != nil {
return err
}
triedb := bc.stateCache.TrieDB()

// If we're running an archive node, always flush
if bc.cacheConfig.TrieDirtyDisabled {
return triedb.Commit(root, false, nil)
return bc.triedb.Commit(root, false, nil)
} else {
// Full but not archive node, do proper garbage collection
triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
bc.triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(root, -int64(block.NumberU64()))

if current := block.NumberU64(); current > TriesInMemory {
// If we exceeded our memory allowance, flush matured singleton nodes to disk
var (
nodes, imgs = triedb.Size()
nodes, imgs = bc.triedb.Size()
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
)
if nodes > limit || imgs > 4*1024*1024 {
triedb.Cap(limit - ethdb.IdealBatchSize)
bc.triedb.Cap(limit - ethdb.IdealBatchSize)
}
// Find the next state trie we need to commit
chosen := current - TriesInMemory
Expand All @@ -1342,7 +1342,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/TriesInMemory)
}
// Flush an entire trie and restart the counters
triedb.Commit(header.Root, true, nil)
bc.triedb.Commit(header.Root, true, nil)
lastWrite = chosen
bc.gcproc = 0
}
Expand All @@ -1354,7 +1354,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
bc.triegc.Push(root, number)
break
}
triedb.Dereference(root.(common.Hash))
bc.triedb.Dereference(root.(common.Hash))
}
}
}
Expand Down Expand Up @@ -1760,7 +1760,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
stats.processed++
stats.usedGas += usedGas

dirty, _ := bc.stateCache.TrieDB().Size()
dirty, _ := bc.triedb.Size()
stats.report(chain, it.index, dirty, setHead)

if !setHead {
Expand Down
6 changes: 6 additions & 0 deletions core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)

// CurrentHeader retrieves the current head header of the canonical chain. The
Expand Down Expand Up @@ -375,6 +376,11 @@ func (bc *BlockChain) TxLookupLimit() uint64 {
return bc.txLookupLimit
}

// TrieDB retrieves the low level trie database used for data storage.
func (bc *BlockChain) TrieDB() *trie.Database {
return bc.triedb
}

// SubscribeRemovedLogsEvent registers a subscription of RemovedLogsEvent.
func (bc *BlockChain) SubscribeRemovedLogsEvent(ch chan<- RemovedLogsEvent) event.Subscription {
return bc.scope.Track(bc.rmLogsFeed.Subscribe(ch))
Expand Down
3 changes: 2 additions & 1 deletion core/chain_makers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
)

// BlockGen creates blocks for testing.
Expand Down Expand Up @@ -308,7 +309,7 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse
// then generate chain on top.
func GenerateChainWithGenesis(genesis *Genesis, engine consensus.Engine, n int, gen func(int, *BlockGen)) (ethdb.Database, []*types.Block, []types.Receipts) {
db := rawdb.NewMemoryDatabase()
_, err := genesis.Commit(db)
_, err := genesis.Commit(db, trie.NewDatabase(db))
if err != nil {
panic(err)
}
Expand Down
Loading