From 91d7cd5fd25bda42931cd48ad5d538361f417cb0 Mon Sep 17 00:00:00 2001 From: Leon <316032931@qq.com> Date: Mon, 28 Mar 2022 10:12:38 +0800 Subject: [PATCH] [R4R]Prefetch state data on mining process (#803) * perf(miner):add mining prefetcher * fix ineffassign * fix comments * fix comments * fix comments: add AsMessagePrefetch to skip nonce check * fix comment:refactor check order of method Forward * fix comments:rename variables * fix comments: rename * rename * fix comments: refactor * update --- core/state_prefetcher.go | 58 ++++++++++++++++++++++++++++ core/types.go | 2 + core/types/transaction.go | 62 ++++++++++++++++++++++++++++++ core/types/transaction_test.go | 69 ++++++++++++++++++++++++++++++++++ miner/worker.go | 12 +++++- 5 files changed, 202 insertions(+), 1 deletion(-) diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index d559a03a0f..0e18bdc8bb 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -27,6 +27,7 @@ import ( ) const prefetchThread = 2 +const checkInterval = 10 // statePrefetcher is a basic Prefetcher, which blindly executes a block on top // of an arbitrary state with the goal of prefetching potentially useful state @@ -88,6 +89,63 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c } } +// PrefetchMining processes the state changes according to the Ethereum rules by running +// the transaction messages using the statedb, but any changes are discarded. The +// only goal is to pre-cache transaction signatures and snapshot clean state. Only used for mining stage +func (p *statePrefetcher) PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction) { + var signer = types.MakeSigner(p.config, header.Number) + + txCh := make(chan *types.Transaction, 2*prefetchThread) + for i := 0; i < prefetchThread; i++ { + go func(startCh <-chan *types.Transaction, stopCh <-chan struct{}) { + idx := 0 + newStatedb := statedb.Copy() + gaspool := new(GasPool).AddGas(gasLimit) + blockContext := NewEVMBlockContext(header, p.bc, nil) + evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg) + // Iterate over and process the individual transactions + for { + select { + case tx := <-startCh: + // Convert the transaction into an executable message and pre-cache its sender + msg, err := tx.AsMessageNoNonceCheck(signer) + if err != nil { + return // Also invalid block, bail out + } + idx++ + newStatedb.Prepare(tx.Hash(), header.Hash(), idx) + precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm) + gaspool = new(GasPool).AddGas(gasLimit) + case <-stopCh: + return + } + } + }(txCh, interruptCh) + } + go func(txset *types.TransactionsByPriceAndNonce) { + count := 0 + for { + tx := txset.Peek() + if tx == nil { + return + } + select { + case <-interruptCh: + return + default: + } + if count++; count%checkInterval == 0 { + if *txCurr == nil { + return + } + txset.Forward(*txCurr) + } + txCh <- tx + txset.Shift() + } + }(txs) +} + // precacheTransaction attempts to apply a transaction to the given state database // and uses the input parameters for its environment. The goal is not to execute // the transaction successfully, rather to warm up touched data slots. diff --git a/core/types.go b/core/types.go index 5ed4817e68..61722aea74 100644 --- a/core/types.go +++ b/core/types.go @@ -40,6 +40,8 @@ type Prefetcher interface { // the transaction messages using the statedb, but any changes are discarded. The // only goal is to pre-cache transaction signatures and state trie nodes. Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) + // PrefetchMining used for pre-caching transaction signatures and state trie nodes. Only used for mining stage. + PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction) } // Processor is an interface for processing blocks using a given initial state. diff --git a/core/types/transaction.go b/core/types/transaction.go index 74c011544b..e95cec25a6 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -458,6 +458,21 @@ func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transa } } +// Copy copys a new TransactionsPriceAndNonce with the same *transaction +func (t *TransactionsByPriceAndNonce) Copy() *TransactionsByPriceAndNonce { + heads := make([]*Transaction, len(t.heads)) + copy(heads, t.heads) + txs := make(map[common.Address]Transactions, len(t.txs)) + for acc, txsTmp := range t.txs { + txs[acc] = txsTmp + } + return &TransactionsByPriceAndNonce{ + heads: heads, + txs: txs, + signer: t.signer, + } +} + // Peek returns the next transaction by price. func (t *TransactionsByPriceAndNonce) Peek() *Transaction { if len(t.heads) == 0 { @@ -488,6 +503,44 @@ func (t *TransactionsByPriceAndNonce) CurrentSize() int { return len(t.heads) } +//Forward moves current transaction to be the one which is one index after tx +func (t *TransactionsByPriceAndNonce) Forward(tx *Transaction) { + if tx == nil { + t.heads = t.heads[0:0] + return + } + //check whether target tx exists in t.heads + for _, head := range t.heads { + if tx == head { + //shift t to the position one after tx + txTmp := t.Peek() + for txTmp != tx { + t.Shift() + txTmp = t.Peek() + } + t.Shift() + return + } + } + //get the sender address of tx + acc, _ := Sender(t.signer, tx) + //check whether target tx exists in t.txs + if txs, ok := t.txs[acc]; ok { + for _, txTmp := range txs { + //found the same pointer in t.txs as tx and then shift t to the position one after tx + if txTmp == tx { + txTmp = t.Peek() + for txTmp != tx { + t.Shift() + txTmp = t.Peek() + } + t.Shift() + return + } + } + } +} + // Message is a fully derived transaction and implements core.Message // // NOTE: In a future PR this will be removed. @@ -535,6 +588,15 @@ func (tx *Transaction) AsMessage(s Signer) (Message, error) { return msg, err } +// AsMessageNoNonceCheck returns the transaction with checkNonce field set to be false. +func (tx *Transaction) AsMessageNoNonceCheck(s Signer) (Message, error) { + msg, err := tx.AsMessage(s) + if err == nil { + msg.checkNonce = false + } + return msg, err +} + func (m Message) From() common.Address { return m.from } func (m Message) To() *common.Address { return m.to } func (m Message) GasPrice() *big.Int { return m.gasPrice } diff --git a/core/types/transaction_test.go b/core/types/transaction_test.go index 3cece9c235..c81b7b8647 100644 --- a/core/types/transaction_test.go +++ b/core/types/transaction_test.go @@ -358,6 +358,75 @@ func TestTransactionTimeSort(t *testing.T) { } } +func TestTransactionForward(t *testing.T) { + // Generate a batch of accounts to start with + keys := make([]*ecdsa.PrivateKey, 5) + for i := 0; i < len(keys); i++ { + keys[i], _ = crypto.GenerateKey() + } + signer := HomesteadSigner{} + + // Generate a batch of transactions with overlapping prices, but different creation times + groups := map[common.Address]Transactions{} + for start, key := range keys { + addr := crypto.PubkeyToAddress(key.PublicKey) + + tx, _ := SignTx(NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key) + tx2, _ := SignTx(NewTransaction(1, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key) + + tx.time = time.Unix(0, int64(len(keys)-start)) + tx2.time = time.Unix(1, int64(len(keys)-start)) + + groups[addr] = append(groups[addr], tx) + groups[addr] = append(groups[addr], tx2) + + } + // Sort the transactions + txset := NewTransactionsByPriceAndNonce(signer, groups) + txsetCpy := txset.Copy() + + txs := Transactions{} + for tx := txsetCpy.Peek(); tx != nil; tx = txsetCpy.Peek() { + txs = append(txs, tx) + txsetCpy.Shift() + } + + tmp := txset.Copy() + for j := 0; j < 10; j++ { + txset = tmp.Copy() + txsetCpy = tmp.Copy() + i := 0 + for ; i < j; i++ { + txset.Shift() + } + tx := txset.Peek() + txsetCpy.Forward(tx) + txCpy := txsetCpy.Peek() + if txCpy == nil { + if tx == nil { + continue + } + txset.Shift() + if txset.Peek() != nil { + t.Errorf("forward got an incorrect result, got %v, want %v", txCpy, tx) + } else { + continue + } + } + txset.Shift() + for ; i < len(txs)-1; i++ { + tx = txset.Peek() + txCpy = txsetCpy.Peek() + if txCpy != tx { + t.Errorf("forward got an incorrect result, got %v, want %v", txCpy, tx) + } + txsetCpy.Shift() + txset.Shift() + } + + } +} + // TestTransactionCoding tests serializing/de-serializing to/from rlp and JSON. func TestTransactionCoding(t *testing.T) { key, err := crypto.GenerateKey() diff --git a/miner/worker.go b/miner/worker.go index 28ef170e40..2224dc0071 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -130,6 +130,7 @@ type intervalAdjust struct { // worker is the main object which takes care of submitting new work to consensus engine // and gathering the sealing result. type worker struct { + prefetcher core.Prefetcher config *Config chainConfig *params.ChainConfig engine consensus.Engine @@ -196,6 +197,7 @@ type worker struct { func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker { worker := &worker{ + prefetcher: core.NewStatePrefetcher(chainConfig, eth.BlockChain(), engine), config: config, chainConfig: chainConfig, engine: engine, @@ -778,6 +780,14 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin } bloomProcessors := core.NewAsyncReceiptBloomGenerator(processorCapacity) + interruptCh := make(chan struct{}) + defer close(interruptCh) + tx := &types.Transaction{} + txCurr := &tx + //prefetch txs from all pending txs + txsPrefetch := txs.Copy() + w.prefetcher.PrefetchMining(txsPrefetch, w.current.header, w.current.gasPool.Gas(), w.current.state.Copy(), *w.chain.GetVMConfig(), interruptCh, txCurr) + LOOP: for { // In the following three cases, we will interrupt the execution of the transaction. @@ -814,7 +824,7 @@ LOOP: } } // Retrieve the next transaction and abort if all done - tx := txs.Peek() + tx = txs.Peek() if tx == nil { break }