Skip to content

Commit e14f8a4

Browse files
authored
Merge pull request #19328 from karalabe/preload
core: prefetch next block state concurrently
2 parents 88d7119 + ed34a5e commit e14f8a4

File tree

11 files changed

+224
-87
lines changed

11 files changed

+224
-87
lines changed

cmd/geth/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ var (
102102
utils.CacheDatabaseFlag,
103103
utils.CacheTrieFlag,
104104
utils.CacheGCFlag,
105+
utils.CacheNoPrefetchFlag,
105106
utils.ListenPortFlag,
106107
utils.MaxPeersFlag,
107108
utils.MaxPendingPeersFlag,

cmd/geth/usage.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ var AppHelpFlagGroups = []flagGroup{
139139
utils.CacheDatabaseFlag,
140140
utils.CacheTrieFlag,
141141
utils.CacheGCFlag,
142+
utils.CacheNoPrefetchFlag,
142143
},
143144
},
144145
{

cmd/utils/flags.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,10 @@ var (
349349
Usage: "Percentage of cache memory allowance to use for trie pruning (default = 25% full mode, 0% archive mode)",
350350
Value: 25,
351351
}
352+
CacheNoPrefetchFlag = cli.BoolFlag{
353+
Name: "cache.noprefetch",
354+
Usage: "Disable heuristic state prefetch during block import (less CPU and disk IO, more time waiting for data)",
355+
}
352356
// Miner settings
353357
MiningEnabledFlag = cli.BoolFlag{
354358
Name: "mine",
@@ -1336,6 +1340,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
13361340
Fatalf("--%s must be either 'full' or 'archive'", GCModeFlag.Name)
13371341
}
13381342
cfg.NoPruning = ctx.GlobalString(GCModeFlag.Name) == "archive"
1343+
cfg.NoPrefetch = ctx.GlobalBool(CacheNoPrefetchFlag.Name)
13391344

13401345
if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheTrieFlag.Name) {
13411346
cfg.TrieCleanCache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheTrieFlag.Name) / 100
@@ -1595,10 +1600,11 @@ func MakeChain(ctx *cli.Context, stack *node.Node) (chain *core.BlockChain, chai
15951600
Fatalf("--%s must be either 'full' or 'archive'", GCModeFlag.Name)
15961601
}
15971602
cache := &core.CacheConfig{
1598-
Disabled: ctx.GlobalString(GCModeFlag.Name) == "archive",
1599-
TrieCleanLimit: eth.DefaultConfig.TrieCleanCache,
1600-
TrieDirtyLimit: eth.DefaultConfig.TrieDirtyCache,
1601-
TrieTimeLimit: eth.DefaultConfig.TrieTimeout,
1603+
TrieCleanLimit: eth.DefaultConfig.TrieCleanCache,
1604+
TrieCleanNoPrefetch: ctx.GlobalBool(CacheNoPrefetchFlag.Name),
1605+
TrieDirtyLimit: eth.DefaultConfig.TrieDirtyCache,
1606+
TrieDirtyDisabled: ctx.GlobalString(GCModeFlag.Name) == "archive",
1607+
TrieTimeLimit: eth.DefaultConfig.TrieTimeout,
16021608
}
16031609
if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheTrieFlag.Name) {
16041610
cache.TrieCleanLimit = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheTrieFlag.Name) / 100

core/blockchain.go

Lines changed: 60 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ var (
6262
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
6363
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)
6464

65+
blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil)
66+
blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil)
67+
6568
ErrNoGenesis = errors.New("Genesis not found in chain")
6669
)
6770

@@ -87,10 +90,11 @@ const (
8790
// CacheConfig contains the configuration values for the trie caching/pruning
8891
// that's resident in a blockchain.
8992
type CacheConfig struct {
90-
Disabled bool // Whether to disable trie write caching (archive node)
91-
TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
92-
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
93-
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
93+
TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
94+
TrieCleanNoPrefetch bool // Whether to disable heuristic state prefetching for followup blocks
95+
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
96+
TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node)
97+
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
9498
}
9599

96100
// BlockChain represents the canonical chain given a database with a genesis
@@ -126,7 +130,6 @@ type BlockChain struct {
126130
genesisBlock *types.Block
127131

128132
chainmu sync.RWMutex // blockchain insertion lock
129-
procmu sync.RWMutex // block processor lock
130133

131134
checkpoint int // checkpoint counts towards the new checkpoint
132135
currentBlock atomic.Value // Current head of the block chain
@@ -145,10 +148,11 @@ type BlockChain struct {
145148
procInterrupt int32 // interrupt signaler for block processing
146149
wg sync.WaitGroup // chain processing wait group for shutting down
147150

148-
engine consensus.Engine
149-
processor Processor // block processor interface
150-
validator Validator // block and state validator interface
151-
vmConfig vm.Config
151+
engine consensus.Engine
152+
validator Validator // Block and state validator interface
153+
prefetcher Prefetcher // Block state prefetcher interface
154+
processor Processor // Block transaction processor interface
155+
vmConfig vm.Config
152156

153157
badBlocks *lru.Cache // Bad block cache
154158
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
@@ -189,8 +193,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
189193
vmConfig: vmConfig,
190194
badBlocks: badBlocks,
191195
}
192-
bc.SetValidator(NewBlockValidator(chainConfig, bc, engine))
193-
bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine))
196+
bc.validator = NewBlockValidator(chainConfig, bc, engine)
197+
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
198+
bc.processor = NewStateProcessor(chainConfig, bc, engine)
194199

195200
var err error
196201
bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt)
@@ -381,31 +386,13 @@ func (bc *BlockChain) CurrentFastBlock() *types.Block {
381386
return bc.currentFastBlock.Load().(*types.Block)
382387
}
383388

384-
// SetProcessor sets the processor required for making state modifications.
385-
func (bc *BlockChain) SetProcessor(processor Processor) {
386-
bc.procmu.Lock()
387-
defer bc.procmu.Unlock()
388-
bc.processor = processor
389-
}
390-
391-
// SetValidator sets the validator which is used to validate incoming blocks.
392-
func (bc *BlockChain) SetValidator(validator Validator) {
393-
bc.procmu.Lock()
394-
defer bc.procmu.Unlock()
395-
bc.validator = validator
396-
}
397-
398389
// Validator returns the current validator.
399390
func (bc *BlockChain) Validator() Validator {
400-
bc.procmu.RLock()
401-
defer bc.procmu.RUnlock()
402391
return bc.validator
403392
}
404393

405394
// Processor returns the current processor.
406395
func (bc *BlockChain) Processor() Processor {
407-
bc.procmu.RLock()
408-
defer bc.procmu.RUnlock()
409396
return bc.processor
410397
}
411398

@@ -722,7 +709,7 @@ func (bc *BlockChain) Stop() {
722709
// - HEAD: So we don't need to reprocess any blocks in the general case
723710
// - HEAD-1: So we don't do large reorgs if our HEAD becomes an uncle
724711
// - HEAD-127: So we have a hard limit on the number of blocks reexecuted
725-
if !bc.cacheConfig.Disabled {
712+
if !bc.cacheConfig.TrieDirtyDisabled {
726713
triedb := bc.stateCache.TrieDB()
727714

728715
for _, offset := range []uint64{0, 1, triesInMemory - 1} {
@@ -982,7 +969,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
982969
triedb := bc.stateCache.TrieDB()
983970

984971
// If we're running an archive node, always flush
985-
if bc.cacheConfig.Disabled {
972+
if bc.cacheConfig.TrieDirtyDisabled {
986973
if err := triedb.Commit(root, false); err != nil {
987974
return NonStatTy, err
988975
}
@@ -1147,7 +1134,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
11471134
// is imported, but then new canon-head is added before the actual sidechain
11481135
// completes, then the historic state could be pruned again
11491136
func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) {
1150-
// If the chain is terminating, don't even bother starting u
1137+
// If the chain is terminating, don't even bother starting up
11511138
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
11521139
return 0, nil, nil, nil
11531140
}
@@ -1175,7 +1162,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
11751162
defer close(abort)
11761163

11771164
// Peek the error for the first block to decide the directing import logic
1178-
it := newInsertIterator(chain, results, bc.Validator())
1165+
it := newInsertIterator(chain, results, bc.validator)
11791166

11801167
block, err := it.next()
11811168

@@ -1238,54 +1225,76 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
12381225
if parent == nil {
12391226
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
12401227
}
1241-
state, err := state.New(parent.Root, bc.stateCache)
1228+
statedb, err := state.New(parent.Root, bc.stateCache)
12421229
if err != nil {
12431230
return it.index, events, coalescedLogs, err
12441231
}
1245-
// Process block using the parent state as reference point.
1232+
// If we have a followup block, run that against the current state to pre-cache
1233+
// transactions and probabilistically some of the account/storage trie nodes.
1234+
var followupInterrupt uint32
1235+
1236+
if !bc.cacheConfig.TrieCleanNoPrefetch {
1237+
if followup, err := it.peek(); followup != nil && err == nil {
1238+
go func(start time.Time) {
1239+
throwaway, _ := state.New(parent.Root, bc.stateCache)
1240+
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)
1241+
1242+
blockPrefetchExecuteTimer.Update(time.Since(start))
1243+
if atomic.LoadUint32(&followupInterrupt) == 1 {
1244+
blockPrefetchInterruptMeter.Mark(1)
1245+
}
1246+
}(time.Now())
1247+
}
1248+
}
1249+
// Process block using the parent state as reference point
12461250
substart := time.Now()
1247-
receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig)
1251+
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
12481252
if err != nil {
12491253
bc.reportBlock(block, receipts, err)
1254+
atomic.StoreUint32(&followupInterrupt, 1)
12501255
return it.index, events, coalescedLogs, err
12511256
}
12521257
// Update the metrics touched during block processing
1253-
accountReadTimer.Update(state.AccountReads) // Account reads are complete, we can mark them
1254-
storageReadTimer.Update(state.StorageReads) // Storage reads are complete, we can mark them
1255-
accountUpdateTimer.Update(state.AccountUpdates) // Account updates are complete, we can mark them
1256-
storageUpdateTimer.Update(state.StorageUpdates) // Storage updates are complete, we can mark them
1258+
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them
1259+
storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them
1260+
accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete, we can mark them
1261+
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them
12571262

1258-
triehash := state.AccountHashes + state.StorageHashes // Save to not double count in validation
1259-
trieproc := state.AccountReads + state.AccountUpdates
1260-
trieproc += state.StorageReads + state.StorageUpdates
1263+
triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation
1264+
trieproc := statedb.AccountReads + statedb.AccountUpdates
1265+
trieproc += statedb.StorageReads + statedb.StorageUpdates
12611266

12621267
blockExecutionTimer.Update(time.Since(substart) - trieproc - triehash)
12631268

12641269
// Validate the state using the default validator
12651270
substart = time.Now()
1266-
if err := bc.Validator().ValidateState(block, state, receipts, usedGas); err != nil {
1271+
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
12671272
bc.reportBlock(block, receipts, err)
1273+
atomic.StoreUint32(&followupInterrupt, 1)
12681274
return it.index, events, coalescedLogs, err
12691275
}
12701276
proctime := time.Since(start)
12711277

12721278
// Update the metrics touched during block validation
1273-
accountHashTimer.Update(state.AccountHashes) // Account hashes are complete, we can mark them
1274-
storageHashTimer.Update(state.StorageHashes) // Storage hashes are complete, we can mark them
1279+
accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete, we can mark them
1280+
storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete, we can mark them
12751281

1276-
blockValidationTimer.Update(time.Since(substart) - (state.AccountHashes + state.StorageHashes - triehash))
1282+
blockValidationTimer.Update(time.Since(substart) - (statedb.AccountHashes + statedb.StorageHashes - triehash))
12771283

12781284
// Write the block to the chain and get the status.
12791285
substart = time.Now()
1280-
status, err := bc.writeBlockWithState(block, receipts, state)
1286+
status, err := bc.writeBlockWithState(block, receipts, statedb)
12811287
if err != nil {
1288+
atomic.StoreUint32(&followupInterrupt, 1)
12821289
return it.index, events, coalescedLogs, err
12831290
}
1291+
atomic.StoreUint32(&followupInterrupt, 1)
1292+
12841293
// Update the metrics touched during block commit
1285-
accountCommitTimer.Update(state.AccountCommits) // Account commits are complete, we can mark them
1286-
storageCommitTimer.Update(state.StorageCommits) // Storage commits are complete, we can mark them
1294+
accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them
1295+
storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them
12871296

1288-
blockWriteTimer.Update(time.Since(substart) - state.AccountCommits - state.StorageCommits)
1297+
blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits)
12891298
blockInsertTimer.UpdateSince(start)
12901299

12911300
switch status {

core/blockchain_insert.go

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,13 @@ func (st *insertStats) report(chain []*types.Block, index int, dirty common.Stor
8080

8181
// insertIterator is a helper to assist during chain import.
8282
type insertIterator struct {
83-
chain types.Blocks
84-
results <-chan error
85-
index int
86-
validator Validator
83+
chain types.Blocks // Chain of blocks being iterated over
84+
85+
results <-chan error // Verification result sink from the consensus engine
86+
errors []error // Header verification errors for the blocks
87+
88+
index int // Current offset of the iterator
89+
validator Validator // Validator to run if verification succeeds
8790
}
8891

8992
// newInsertIterator creates a new iterator based on the given blocks, which are
@@ -92,6 +95,7 @@ func newInsertIterator(chain types.Blocks, results <-chan error, validator Valid
9295
return &insertIterator{
9396
chain: chain,
9497
results: results,
98+
errors: make([]error, 0, len(chain)),
9599
index: -1,
96100
validator: validator,
97101
}
@@ -100,17 +104,44 @@ func newInsertIterator(chain types.Blocks, results <-chan error, validator Valid
100104
// next returns the next block in the iterator, along with any potential validation
101105
// error for that block. When the end is reached, it will return (nil, nil).
102106
func (it *insertIterator) next() (*types.Block, error) {
107+
// If we reached the end of the chain, abort
103108
if it.index+1 >= len(it.chain) {
104109
it.index = len(it.chain)
105110
return nil, nil
106111
}
112+
// Advance the iterator and wait for verification result if not yet done
107113
it.index++
108-
if err := <-it.results; err != nil {
109-
return it.chain[it.index], err
114+
if len(it.errors) <= it.index {
115+
it.errors = append(it.errors, <-it.results)
116+
}
117+
if it.errors[it.index] != nil {
118+
return it.chain[it.index], it.errors[it.index]
110119
}
120+
// Block header valid, run body validation and return
111121
return it.chain[it.index], it.validator.ValidateBody(it.chain[it.index])
112122
}
113123

124+
// peek returns the next block in the iterator, along with any potential validation
125+
// error for that block, but does **not** advance the iterator.
126+
//
127+
// Both header and body validation errors (nil too) is cached into the iterator
128+
// to avoid duplicating work on the following next() call.
129+
func (it *insertIterator) peek() (*types.Block, error) {
130+
// If we reached the end of the chain, abort
131+
if it.index+1 >= len(it.chain) {
132+
return nil, nil
133+
}
134+
// Wait for verification result if not yet done
135+
if len(it.errors) <= it.index+1 {
136+
it.errors = append(it.errors, <-it.results)
137+
}
138+
if it.errors[it.index+1] != nil {
139+
return it.chain[it.index+1], it.errors[it.index+1]
140+
}
141+
// Block header valid, ignore body validation since we don't have a parent anyway
142+
return it.chain[it.index+1], nil
143+
}
144+
114145
// previous returns the previous header that was being processed, or nil.
115146
func (it *insertIterator) previous() *types.Header {
116147
if it.index < 1 {

core/blockchain_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
144144
if err != nil {
145145
return err
146146
}
147-
receipts, _, usedGas, err := blockchain.Processor().Process(block, statedb, vm.Config{})
147+
receipts, _, usedGas, err := blockchain.processor.Process(block, statedb, vm.Config{})
148148
if err != nil {
149149
blockchain.reportBlock(block, receipts, err)
150150
return err

0 commit comments

Comments
 (0)