Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R]Prefetch state data on mining process #803

Merged
merged 11 commits into from
Mar 28, 2022
6 changes: 6 additions & 0 deletions core/gaspool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ import (
// in a block. The zero value is a pool with zero gas available.
type GasPool uint64

// SetGas set an initial value for gaspool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set -> sets

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

func (gp *GasPool) SetGas(amount uint64) *GasPool {
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
setunapo marked this conversation as resolved.
Show resolved Hide resolved
*(*uint64)(gp) = amount
return gp
}

// AddGas makes gas available for execution.
func (gp *GasPool) AddGas(amount uint64) *GasPool {
if uint64(*gp) > math.MaxUint64-amount {
Expand Down
58 changes: 58 additions & 0 deletions core/state_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
)

const prefetchThread = 2
const checkInterval = 10

// statePrefetcher is a basic Prefetcher, which blindly executes a block on top
// of an arbitrary state with the goal of prefetching potentially useful state
Expand Down Expand Up @@ -88,6 +89,63 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
}
}

// PrefetchMining processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb, but any changes are discarded. The
// only goal is to pre-cache transaction signatures and snapshot clean state. Only used for mining stage
func (p *statePrefetcher) PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction) {
var signer = types.MakeSigner(p.config, header.Number)

txCh := make(chan *types.Transaction, 2*prefetchThread)
for i := 0; i < prefetchThread; i++ {
go func(txCh <-chan *types.Transaction, stopCh <-chan struct{}) {
setunapo marked this conversation as resolved.
Show resolved Hide resolved
idx := 0
newStatedb := statedb.Copy()
gaspool := new(GasPool).AddGas(gasLimit)
blockContext := NewEVMBlockContext(header, p.bc, nil)
evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
// Iterate over and process the individual transactions
for {
select {
case tx := <-txCh:
// Convert the transaction into an executable message and pre-cache its sender
msg, err := tx.AsMessage(signer)
setunapo marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return // Also invalid block, bail out
}
idx++
newStatedb.Prepare(tx.Hash(), header.Hash(), idx)
precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm)
gaspool.SetGas(gasLimit)
setunapo marked this conversation as resolved.
Show resolved Hide resolved
case <-stopCh:
return
}
}
}(txCh, interruptCh)
}
go func(txs *types.TransactionsByPriceAndNonce) {
count := 0
for {
tx := txs.Peek()
if tx == nil {
return
}
select {
case <-interruptCh:
return
default:
}
if count++; count%checkInterval == 0 {
if *txCurr == nil {
setunapo marked this conversation as resolved.
Show resolved Hide resolved
return
}
txs.Forward(*txCurr)
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
}
txCh <- tx
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
txs.Shift()
}
}(txs)
}

// precacheTransaction attempts to apply a transaction to the given state database
// and uses the input parameters for its environment. The goal is not to execute
// the transaction successfully, rather to warm up touched data slots.
Expand Down
2 changes: 2 additions & 0 deletions core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type Prefetcher interface {
// the transaction messages using the statedb, but any changes are discarded. The
// only goal is to pre-cache transaction signatures and state trie nodes.
Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32)
// PrefetchMining used for pre-caching transaction signatures and state trie nodes. Only used for mining stage.
PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction)
}

// Processor is an interface for processing blocks using a given initial state.
Expand Down
50 changes: 50 additions & 0 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,21 @@ func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transa
}
}

// Copy copy a new TransactionsPriceAndNonce with the same *transaction
setunapo marked this conversation as resolved.
Show resolved Hide resolved
func (t *TransactionsByPriceAndNonce) Copy() *TransactionsByPriceAndNonce {
heads := make([]*Transaction, len(t.heads))
copy(heads, t.heads)
txs := make(map[common.Address]Transactions, len(t.txs))
for acc, txsTmp := range t.txs {
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
txs[acc] = txsTmp
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
}
return &TransactionsByPriceAndNonce{
heads: heads,
txs: txs,
signer: t.signer,
setunapo marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Peek returns the next transaction by price.
func (t *TransactionsByPriceAndNonce) Peek() *Transaction {
if len(t.heads) == 0 {
Expand Down Expand Up @@ -488,6 +503,41 @@ func (t *TransactionsByPriceAndNonce) CurrentSize() int {
return len(t.heads)
}

//Forward move t to be one index behind tx, param tx cant be nil
func (t *TransactionsByPriceAndNonce) Forward(tx *Transaction) {
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
//get the sender address of tx
acc, _ := Sender(t.signer, tx)
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
setunapo marked this conversation as resolved.
Show resolved Hide resolved
for _, head := range t.heads {
accTmp, _ := Sender(t.signer, head)
setunapo marked this conversation as resolved.
Show resolved Hide resolved
if acc == accTmp {
//found element in t.headers euqals to tx which means they point to the same transaction
if tx == head {
setunapo marked this conversation as resolved.
Show resolved Hide resolved
//shift t to the position one after tx
txTmp := t.Peek()
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
for txTmp != tx {
t.Shift()
txTmp = t.Peek()
}
t.Shift()
return
}
for _, txTmp := range t.txs[accTmp] {
//found the same pointer in t.txs as tx and then shift t to the position one after tx
if txTmp == tx {
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
txTmp = t.Peek()
for txTmp != tx {
t.Shift()
txTmp = t.Peek()
}
t.Shift()
return
}
}
return
}
}
}

// Message is a fully derived transaction and implements core.Message
//
// NOTE: In a future PR this will be removed.
Expand Down
69 changes: 69 additions & 0 deletions core/types/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,75 @@ func TestTransactionTimeSort(t *testing.T) {
}
}

func TestTransactionForward(t *testing.T) {
// Generate a batch of accounts to start with
keys := make([]*ecdsa.PrivateKey, 5)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
}
signer := HomesteadSigner{}

// Generate a batch of transactions with overlapping prices, but different creation times
groups := map[common.Address]Transactions{}
for start, key := range keys {
addr := crypto.PubkeyToAddress(key.PublicKey)

tx, _ := SignTx(NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key)
tx2, _ := SignTx(NewTransaction(1, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key)

tx.time = time.Unix(0, int64(len(keys)-start))
tx2.time = time.Unix(1, int64(len(keys)-start))

groups[addr] = append(groups[addr], tx)
groups[addr] = append(groups[addr], tx2)

}
// Sort the transactions
txset := NewTransactionsByPriceAndNonce(signer, groups)
txsetCpy := txset.Copy()

txs := Transactions{}
for tx := txsetCpy.Peek(); tx != nil; tx = txsetCpy.Peek() {
txs = append(txs, tx)
txsetCpy.Shift()
}

tmp := txset.Copy()
for j := 0; j < 10; j++ {
txset = tmp.Copy()
txsetCpy = tmp.Copy()
i := 0
for ; i < j; i++ {
txset.Shift()
}
tx := txset.Peek()
txsetCpy.Forward(tx)
txCpy := txsetCpy.Peek()
if txCpy == nil {
if tx == nil {
continue
}
txset.Shift()
if txset.Peek() != nil {
t.Errorf("forward got an incorrect result, got %v, want %v", txCpy, tx)
} else {
continue
}
}
txset.Shift()
for ; i < len(txs)-1; i++ {
tx = txset.Peek()
txCpy = txsetCpy.Peek()
if txCpy != tx {
t.Errorf("forward got an incorrect result, got %v, want %v", txCpy, tx)
}
txsetCpy.Shift()
txset.Shift()
}

}
}

// TestTransactionCoding tests serializing/de-serializing to/from rlp and JSON.
func TestTransactionCoding(t *testing.T) {
key, err := crypto.GenerateKey()
Expand Down
12 changes: 11 additions & 1 deletion miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ type intervalAdjust struct {
// worker is the main object which takes care of submitting new work to consensus engine
// and gathering the sealing result.
type worker struct {
prefetcher core.Prefetcher
config *Config
chainConfig *params.ChainConfig
engine consensus.Engine
Expand Down Expand Up @@ -196,6 +197,7 @@ type worker struct {

func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker {
worker := &worker{
prefetcher: core.NewStatePrefetcher(chainConfig, eth.BlockChain(), engine),
config: config,
chainConfig: chainConfig,
engine: engine,
Expand Down Expand Up @@ -778,6 +780,14 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
}
bloomProcessors := core.NewAsyncReceiptBloomGenerator(processorCapacity)

interruptCh := make(chan struct{})
defer close(interruptCh)
tx := &types.Transaction{}
txCurr := &tx
//prefetch txs from all pending txs
txsPrefetch := txs.Copy()
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
w.prefetcher.PrefetchMining(txsPrefetch, w.current.header, w.current.gasPool.Gas(), w.current.state.Copy(), *w.chain.GetVMConfig(), interruptCh, txCurr)

LOOP:
for {
// In the following three cases, we will interrupt the execution of the transaction.
Expand Down Expand Up @@ -814,7 +824,7 @@ LOOP:
}
}
// Retrieve the next transaction and abort if all done
tx := txs.Peek()
tx = txs.Peek()
if tx == nil {
break
}
Expand Down