Skip to content

Commit

Permalink
perf(miner):add mining prefetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
qinglin89 committed Mar 16, 2022
1 parent 21a3b11 commit 14e4f7f
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 0 deletions.
9 changes: 9 additions & 0 deletions core/gaspool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ import (
// in a block. The zero value is a pool with zero gas available.
type GasPool uint64

// SetGas set an initial value for gaspool
func (gp *GasPool) SetGas(amount uint64) *GasPool {
if amount > math.MaxUint64 {
panic("gas pool pushed above uint64")
}
*(*uint64)(gp) = amount
return gp
}

// AddGas makes gas available for execution.
func (gp *GasPool) AddGas(amount uint64) *GasPool {
if uint64(*gp) > math.MaxUint64-amount {
Expand Down
57 changes: 57 additions & 0 deletions core/state_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,63 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
}
}

// PrefetchMining processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb, but any changes are discarded. The
// only goal is to pre-cache transaction signatures and snapshot clean state. Only used for mining stage
func (p *statePrefetcher) PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction) {
var signer = types.MakeSigner(p.config, header.Number)

txCh := make(chan *types.Transaction, 2*prefetchThread)
for i := 0; i < prefetchThread; i++ {
go func(txCh <-chan *types.Transaction, stopCh <-chan struct{}) {
idx := 0
newStatedb := statedb.Copy()
gaspool := new(GasPool).AddGas(gasLimit)
blockContext := NewEVMBlockContext(header, p.bc, nil)
evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
// Iterate over and process the individual transactions
for {
select {
case tx := <-txCh:
// Convert the transaction into an executable message and pre-cache its sender
msg, err := tx.AsMessage(signer)
if err != nil {
return // Also invalid block, bail out
}
idx++
newStatedb.Prepare(tx.Hash(), header.Hash(), idx)
precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm)
gaspool.SetGas(gasLimit)
case <-stopCh:
return
}
}
}(txCh, interruptCh)
}
go func(txs *types.TransactionsByPriceAndNonce) {
count := 0
for {
tx := txs.Peek()
if tx == nil {
return
}
select {
case <-interruptCh:
return
default:
}
if count++; count%10 == 0 {
if *txCurr == nil {
return
}
txs.Forward(*txCurr)
}
txCh <- tx
txs.Shift()
}
}(txs)
}

// precacheTransaction attempts to apply a transaction to the given state database
// and uses the input parameters for its environment. The goal is not to execute
// the transaction successfully, rather to warm up touched data slots.
Expand Down
2 changes: 2 additions & 0 deletions core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type Prefetcher interface {
// the transaction messages using the statedb, but any changes are discarded. The
// only goal is to pre-cache transaction signatures and state trie nodes.
Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32)
// PrefetchMining used for pre-caching transaction signatures and state trie nodes. Only used for mining stage.
PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction)
}

// Processor is an interface for processing blocks using a given initial state.
Expand Down
56 changes: 56 additions & 0 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,21 @@ func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transa
}
}

// Copy copy a new TransactionsPriceAndNonce with the same *transaction
func (t *TransactionsByPriceAndNonce) Copy() *TransactionsByPriceAndNonce {
heads := make([]*Transaction, len(t.heads))
copy(heads, t.heads)
txs := make(map[common.Address]Transactions, len(t.txs))
for acc, txsTmp := range t.txs {
txs[acc] = txsTmp
}
return &TransactionsByPriceAndNonce{
heads: heads,
txs: txs,
signer: t.signer,
}
}

// Peek returns the next transaction by price.
func (t *TransactionsByPriceAndNonce) Peek() *Transaction {
if len(t.heads) == 0 {
Expand Down Expand Up @@ -488,6 +503,47 @@ func (t *TransactionsByPriceAndNonce) CurrentSize() int {
return len(t.heads)
}

//Forward move t to be one index behind tx, tx cant be nil
func (t *TransactionsByPriceAndNonce) Forward(tx *Transaction) {
if tx == nil {
txTmp := t.Peek()
for txTmp != nil {
t.Shift()
txTmp = t.Peek()
}
return
}

l := len(t.heads)
acc, _ := Sender(t.signer, tx)
for i := 0; i < l; i++ {
accTmp, _ := Sender(t.signer, t.heads[i])
if acc == accTmp {
if tx == t.heads[i] {
txTmp := t.Peek()
for txTmp != tx {
t.Shift()
txTmp = t.Peek()
}
t.Shift()
return
}
for _, txTmp := range t.txs[accTmp] {
if txTmp == tx {
txTmp = t.Peek()
for txTmp != tx {
t.Shift()
txTmp = t.Peek()
}
t.Shift()
return
}
}
return
}
}
}

// Message is a fully derived transaction and implements core.Message
//
// NOTE: In a future PR this will be removed.
Expand Down
69 changes: 69 additions & 0 deletions core/types/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,75 @@ func TestTransactionTimeSort(t *testing.T) {
}
}

func TestTransactionForward(t *testing.T) {
// Generate a batch of accounts to start with
keys := make([]*ecdsa.PrivateKey, 5)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
}
signer := HomesteadSigner{}

// Generate a batch of transactions with overlapping prices, but different creation times
groups := map[common.Address]Transactions{}
for start, key := range keys {
addr := crypto.PubkeyToAddress(key.PublicKey)

tx, _ := SignTx(NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key)
tx2, _ := SignTx(NewTransaction(1, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key)

tx.time = time.Unix(0, int64(len(keys)-start))
tx2.time = time.Unix(1, int64(len(keys)-start))

groups[addr] = append(groups[addr], tx)
groups[addr] = append(groups[addr], tx2)

}
// Sort the transactions
txset := NewTransactionsByPriceAndNonce(signer, groups)
txsetCpy := txset.Copy()

txs := Transactions{}
for tx := txsetCpy.Peek(); tx != nil; tx = txsetCpy.Peek() {
txs = append(txs, tx)
txsetCpy.Shift()
}

tmp := txset.Copy()
for j := 0; j < 10; j++ {
txset = tmp.Copy()
txsetCpy = tmp.Copy()
i := 0
for ; i < j; i++ {
txset.Shift()
}
tx := txset.Peek()
txsetCpy.Forward(tx)
txCpy := txsetCpy.Peek()
if txCpy == nil {
if tx == nil {
continue
}
txset.Shift()
if txset.Peek() != nil {
t.Errorf("forward got an incorrect result, got %v, want %v", txCpy, tx)
} else {
continue
}
}
txset.Shift()
for ; i < len(txs)-1; i++ {
tx = txset.Peek()
txCpy = txsetCpy.Peek()
if txCpy != tx {
t.Errorf("forward got an incorrect result, got %v, want %v", txCpy, tx)
}
txsetCpy.Shift()
txset.Shift()
}

}
}

// TestTransactionCoding tests serializing/de-serializing to/from rlp and JSON.
func TestTransactionCoding(t *testing.T) {
key, err := crypto.GenerateKey()
Expand Down
12 changes: 12 additions & 0 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ type intervalAdjust struct {
// worker is the main object which takes care of submitting new work to consensus engine
// and gathering the sealing result.
type worker struct {
prefetcher core.Prefetcher
config *Config
chainConfig *params.ChainConfig
engine consensus.Engine
Expand Down Expand Up @@ -196,6 +197,7 @@ type worker struct {

func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker {
worker := &worker{
prefetcher: core.NewStatePrefetcher(chainConfig, eth.BlockChain(), engine),
config: config,
chainConfig: chainConfig,
engine: engine,
Expand Down Expand Up @@ -778,6 +780,13 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
}
bloomProcessors := core.NewAsyncReceiptBloomGenerator(processorCapacity)

// var interruptPrefetch uint32
interruptCh := make(chan struct{})
var txCurr **types.Transaction
//prefetch txs from all pending txs
txsPrefetch := txs.Copy()
w.prefetcher.PrefetchMining(txsPrefetch, w.current.header, w.current.gasPool.Gas(), w.current.state.Copy(), *w.chain.GetVMConfig(), interruptCh, txCurr)

LOOP:
for {
// In the following three cases, we will interrupt the execution of the transaction.
Expand All @@ -798,6 +807,7 @@ LOOP:
inc: true,
}
}
close(interruptCh)
return atomic.LoadInt32(interrupt) == commitInterruptNewHead
}
// If we don't have enough gas for any further transactions then we're done
Expand All @@ -815,6 +825,7 @@ LOOP:
}
// Retrieve the next transaction and abort if all done
tx := txs.Peek()
txCurr = &tx
if tx == nil {
break
}
Expand Down Expand Up @@ -868,6 +879,7 @@ LOOP:
txs.Shift()
}
}
close(interruptCh)
bloomProcessors.Close()

if !w.isRunning() && len(coalescedLogs) > 0 {
Expand Down

0 comments on commit 14e4f7f

Please sign in to comment.