Skip to content

Commit

Permalink
core: prefetch next block state concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
karalabe committed Apr 1, 2019
1 parent 86e7790 commit bb9631c
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 75 deletions.
96 changes: 51 additions & 45 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ var (
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)

blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil)
blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil)

ErrNoGenesis = errors.New("Genesis not found in chain")
)

Expand Down Expand Up @@ -126,7 +129,6 @@ type BlockChain struct {
genesisBlock *types.Block

chainmu sync.RWMutex // blockchain insertion lock
procmu sync.RWMutex // block processor lock

checkpoint int // checkpoint counts towards the new checkpoint
currentBlock atomic.Value // Current head of the block chain
Expand All @@ -145,10 +147,11 @@ type BlockChain struct {
procInterrupt int32 // interrupt signaler for block processing
wg sync.WaitGroup // chain processing wait group for shutting down

engine consensus.Engine
processor Processor // block processor interface
validator Validator // block and state validator interface
vmConfig vm.Config
engine consensus.Engine
validator Validator // Block and state validator interface
prefetcher Prefetcher // Block state prefetcher interface
processor Processor // Block transaction processor interface
vmConfig vm.Config

badBlocks *lru.Cache // Bad block cache
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
Expand Down Expand Up @@ -189,8 +192,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
vmConfig: vmConfig,
badBlocks: badBlocks,
}
bc.SetValidator(NewBlockValidator(chainConfig, bc, engine))
bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine))
bc.validator = NewBlockValidator(chainConfig, bc, engine)
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
bc.processor = NewStateProcessor(chainConfig, bc, engine)

var err error
bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt)
Expand Down Expand Up @@ -381,31 +385,13 @@ func (bc *BlockChain) CurrentFastBlock() *types.Block {
return bc.currentFastBlock.Load().(*types.Block)
}

// SetProcessor sets the processor required for making state modifications.
func (bc *BlockChain) SetProcessor(processor Processor) {
bc.procmu.Lock()
defer bc.procmu.Unlock()
bc.processor = processor
}

// SetValidator sets the validator which is used to validate incoming blocks.
func (bc *BlockChain) SetValidator(validator Validator) {
bc.procmu.Lock()
defer bc.procmu.Unlock()
bc.validator = validator
}

// Validator returns the current validator.
func (bc *BlockChain) Validator() Validator {
bc.procmu.RLock()
defer bc.procmu.RUnlock()
return bc.validator
}

// Processor returns the current processor.
func (bc *BlockChain) Processor() Processor {
bc.procmu.RLock()
defer bc.procmu.RUnlock()
return bc.processor
}

Expand Down Expand Up @@ -1147,7 +1133,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// is imported, but then new canon-head is added before the actual sidechain
// completes, then the historic state could be pruned again
func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) {
// If the chain is terminating, don't even bother starting u
// If the chain is terminating, don't even bother starting up
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
return 0, nil, nil, nil
}
Expand Down Expand Up @@ -1175,7 +1161,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
defer close(abort)

// Peek the error for the first block to decide the directing import logic
it := newInsertIterator(chain, results, bc.Validator())
it := newInsertIterator(chain, results, bc.validator)

block, err := it.next()

Expand Down Expand Up @@ -1238,54 +1224,74 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
if parent == nil {
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}
state, err := state.New(parent.Root, bc.stateCache)
statedb, err := state.New(parent.Root, bc.stateCache)
if err != nil {
return it.index, events, coalescedLogs, err
}
// Process block using the parent state as reference point.
// If we have a followup block, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes.
var followupInterrupt uint32

if followup, err := it.peek(); followup != nil && err == nil {
go func(start time.Time) {
throwaway, _ := state.New(parent.Root, bc.stateCache)
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)

blockPrefetchExecuteTimer.Update(time.Since(start))
if atomic.LoadUint32(&followupInterrupt) == 1 {
blockPrefetchInterruptMeter.Mark(1)
}
}(time.Now())
}
// Process block using the parent state as reference point
substart := time.Now()
receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig)
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
if err != nil {
bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1)
return it.index, events, coalescedLogs, err
}
// Update the metrics touched during block processing
accountReadTimer.Update(state.AccountReads) // Account reads are complete, we can mark them
storageReadTimer.Update(state.StorageReads) // Storage reads are complete, we can mark them
accountUpdateTimer.Update(state.AccountUpdates) // Account updates are complete, we can mark them
storageUpdateTimer.Update(state.StorageUpdates) // Storage updates are complete, we can mark them
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them
storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them
accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete, we can mark them
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them

triehash := state.AccountHashes + state.StorageHashes // Save to not double count in validation
trieproc := state.AccountReads + state.AccountUpdates
trieproc += state.StorageReads + state.StorageUpdates
triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation
trieproc := statedb.AccountReads + statedb.AccountUpdates
trieproc += statedb.StorageReads + statedb.StorageUpdates

blockExecutionTimer.Update(time.Since(substart) - trieproc - triehash)

// Validate the state using the default validator
substart = time.Now()
if err := bc.Validator().ValidateState(block, state, receipts, usedGas); err != nil {
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1)
return it.index, events, coalescedLogs, err
}
proctime := time.Since(start)

// Update the metrics touched during block validation
accountHashTimer.Update(state.AccountHashes) // Account hashes are complete, we can mark them
storageHashTimer.Update(state.StorageHashes) // Storage hashes are complete, we can mark them
accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete, we can mark them
storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete, we can mark them

blockValidationTimer.Update(time.Since(substart) - (state.AccountHashes + state.StorageHashes - triehash))
blockValidationTimer.Update(time.Since(substart) - (statedb.AccountHashes + statedb.StorageHashes - triehash))

// Write the block to the chain and get the status.
substart = time.Now()
status, err := bc.writeBlockWithState(block, receipts, state)
status, err := bc.writeBlockWithState(block, receipts, statedb)
if err != nil {
atomic.StoreUint32(&followupInterrupt, 1)
return it.index, events, coalescedLogs, err
}
atomic.StoreUint32(&followupInterrupt, 1)

// Update the metrics touched during block commit
accountCommitTimer.Update(state.AccountCommits) // Account commits are complete, we can mark them
storageCommitTimer.Update(state.StorageCommits) // Storage commits are complete, we can mark them
accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them
storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them

blockWriteTimer.Update(time.Since(substart) - state.AccountCommits - state.StorageCommits)
blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits)
blockInsertTimer.UpdateSince(start)

switch status {
Expand Down
43 changes: 37 additions & 6 deletions core/blockchain_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,13 @@ func (st *insertStats) report(chain []*types.Block, index int, dirty common.Stor

// insertIterator is a helper to assist during chain import.
type insertIterator struct {
chain types.Blocks
results <-chan error
index int
validator Validator
chain types.Blocks // Chain of blocks being iterated over

results <-chan error // Verification result sink from the consensus engine
errors []error // Header verification errors for the blocks

index int // Current offset of the iterator
validator Validator // Validator to run if verification succeeds
}

// newInsertIterator creates a new iterator based on the given blocks, which are
Expand All @@ -92,6 +95,7 @@ func newInsertIterator(chain types.Blocks, results <-chan error, validator Valid
return &insertIterator{
chain: chain,
results: results,
errors: make([]error, 0, len(chain)),
index: -1,
validator: validator,
}
Expand All @@ -100,17 +104,44 @@ func newInsertIterator(chain types.Blocks, results <-chan error, validator Valid
// next returns the next block in the iterator, along with any potential validation
// error for that block. When the end is reached, it will return (nil, nil).
func (it *insertIterator) next() (*types.Block, error) {
// If we reached the end of the chain, abort
if it.index+1 >= len(it.chain) {
it.index = len(it.chain)
return nil, nil
}
// Advance the iterator and wait for verification result if not yet done
it.index++
if err := <-it.results; err != nil {
return it.chain[it.index], err
if len(it.errors) <= it.index {
it.errors = append(it.errors, <-it.results)
}
if it.errors[it.index] != nil {
return it.chain[it.index], it.errors[it.index]
}
// Block header valid, run body validation and return
return it.chain[it.index], it.validator.ValidateBody(it.chain[it.index])
}

// peek returns the next block in the iterator, along with any potential validation
// error for that block, but does **not** advance the iterator.
//
// Both header and body validation errors (nil too) is cached into the iterator
// to avoid duplicating work on the following next() call.
func (it *insertIterator) peek() (*types.Block, error) {
// If we reached the end of the chain, abort
if it.index+1 >= len(it.chain) {
return nil, nil
}
// Wait for verification result if not yet done
if len(it.errors) <= it.index+1 {
it.errors = append(it.errors, <-it.results)
}
if it.errors[it.index+1] != nil {
return it.chain[it.index+1], it.errors[it.index+1]
}
// Block header valid, ignore body validation since we don't have a parent anyway
return it.chain[it.index+1], nil
}

// previous returns the previous header that was being processed, or nil.
func (it *insertIterator) previous() *types.Header {
if it.index < 1 {
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
if err != nil {
return err
}
receipts, _, usedGas, err := blockchain.Processor().Process(block, statedb, vm.Config{})
receipts, _, usedGas, err := blockchain.processor.Process(block, statedb, vm.Config{})
if err != nil {
blockchain.reportBlock(block, receipts, err)
return err
Expand Down
85 changes: 85 additions & 0 deletions core/state_prefetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package core

import (
"sync/atomic"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/params"
)

// statePrefetcher is a basic Prefetcher, which blindly executes a block on top
// of an arbitrary state with the goal of prefetching potentially useful state
// data from disk before the main block processor start executing.
type statePrefetcher struct {
config *params.ChainConfig // Chain configuration options
bc *BlockChain // Canonical block chain
engine consensus.Engine // Consensus engine used for block rewards
}

// newStatePrefetcher initialises a new statePrefetcher.
func newStatePrefetcher(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *statePrefetcher {
return &statePrefetcher{
config: config,
bc: bc,
engine: engine,
}
}

// Prefetch processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb, but any changes are discarded. The
// only goal is to pre-cache transaction signatures and state trie nodes.
func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) {
var (
header = block.Header()
gaspool = new(GasPool).AddGas(block.GasLimit())
)
// Iterate over and process the individual transactions
for i, tx := range block.Transactions() {
// If block precaching was interrupted, abort
if interrupt != nil && atomic.LoadUint32(interrupt) == 1 {
return
}
// Block precaching permitted to continue, execute the transaction
statedb.Prepare(tx.Hash(), block.Hash(), i)
if err := precacheTransaction(p.config, p.bc, nil, gaspool, statedb, header, tx, cfg); err != nil {
return // Ugh, something went horribly wrong, bail out
}
}
}

// precacheTransaction attempts to apply a transaction to the given state database
// and uses the input parameters for its environment. The goal is not to execute
// the transaction successfully, rather to warm up touched data slots.
func precacheTransaction(config *params.ChainConfig, bc ChainContext, author *common.Address, gaspool *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, cfg vm.Config) error {
// Convert the transaction into an executable message and pre-cache its sender
msg, err := tx.AsMessage(types.MakeSigner(config, header.Number))
if err != nil {
return err
}
// Create the EVM and execute the transaction
context := NewEVMContext(msg, header, bc, author)
vm := vm.NewEVM(context, statedb, config, cfg)

_, _, _, err = ApplyMessage(vm, msg, gaspool)
return err
}
17 changes: 11 additions & 6 deletions core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
// Validator is an interface which defines the standard for block validation. It
// is only responsible for validating block contents, as the header validation is
// done by the specific consensus engines.
//
type Validator interface {
// ValidateBody validates the given block's content.
ValidateBody(block *types.Block) error
Expand All @@ -35,12 +34,18 @@ type Validator interface {
ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error
}

// Prefetcher is an interface for pre-caching transaction signatures and state.
type Prefetcher interface {
// Prefetch processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb, but any changes are discarded. The
// only goal is to pre-cache transaction signatures and state trie nodes.
Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32)
}

// Processor is an interface for processing blocks using a given initial state.
//
// Process takes the block to be processed and the statedb upon which the
// initial state is based. It should return the receipts generated, amount
// of gas used in the process and return an error if any of the internal rules
// failed.
type Processor interface {
// Process processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb and applying any rewards to both
// the processor (coinbase) and any included uncles.
Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error)
}
Loading

0 comments on commit bb9631c

Please sign in to comment.