Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimization: txpool pending cache improvement #177

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions accounts/abi/bind/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func TestWaitDeployed(t *testing.T) {

// Send and mine the transaction.
backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
backend.Commit()

select {
Expand Down Expand Up @@ -117,6 +118,7 @@ func TestWaitDeployedCornerCases(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
backend.Commit()
notContractCreation := errors.New("tx is not contract creation")
if _, err := bind.WaitDeployed(ctx, backend.Client(), tx); err.Error() != notContractCreation.Error() {
Expand All @@ -135,5 +137,6 @@ func TestWaitDeployedCornerCases(t *testing.T) {
}()

backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
cancel()
}
79 changes: 74 additions & 5 deletions core/txpool/legacypool/cache_for_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
"github.com/holiman/uint256"
)

var (
Expand All @@ -20,12 +22,18 @@ type cacheForMiner struct {
pending map[common.Address]map[*types.Transaction]struct{}
locals map[common.Address]bool
addrLock sync.Mutex

allCache map[common.Address][]*txpool.LazyTransaction
filteredCache map[common.Address][]*txpool.LazyTransaction
cacheLock sync.Mutex
}

func newCacheForMiner() *cacheForMiner {
return &cacheForMiner{
pending: make(map[common.Address]map[*types.Transaction]struct{}),
locals: make(map[common.Address]bool),
pending: make(map[common.Address]map[*types.Transaction]struct{}),
locals: make(map[common.Address]bool),
allCache: make(map[common.Address][]*txpool.LazyTransaction),
filteredCache: make(map[common.Address][]*txpool.LazyTransaction),
}
}

Expand Down Expand Up @@ -67,8 +75,9 @@ func (pc *cacheForMiner) del(txs types.Transactions, signer types.Signer) {
}
}

func (pc *cacheForMiner) dump() map[common.Address]types.Transactions {
func (pc *cacheForMiner) sync2cache(pool txpool.LazyResolver, filter func(txs types.Transactions, addr common.Address) types.Transactions) {
pending := make(map[common.Address]types.Transactions)

pc.txLock.Lock()
for addr, txlist := range pc.pending {
pending[addr] = make(types.Transactions, 0, len(txlist))
Expand All @@ -77,11 +86,71 @@ func (pc *cacheForMiner) dump() map[common.Address]types.Transactions {
}
}
pc.txLock.Unlock()
for _, txs := range pending {

// convert pending to lazyTransactions
filteredLazy := make(map[common.Address][]*txpool.LazyTransaction)
allLazy := make(map[common.Address][]*txpool.LazyTransaction)
for addr, txs := range pending {
// sorted by nonce
sort.Sort(types.TxByNonce(txs))
filterd := filter(txs, addr)
if len(txs) > 0 {
lazies := make([]*txpool.LazyTransaction, len(txs))
for i, tx := range txs {
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: tx.Hash(),
Tx: tx,
Time: tx.Time(),
GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
GasTipCap: uint256.MustFromBig(tx.GasTipCap()),
Gas: tx.Gas(),
BlobGas: tx.BlobGas(),
}
}
allLazy[addr] = lazies
filteredLazy[addr] = lazies[:len(filterd)]
}
}

pc.cacheLock.Lock()
pc.filteredCache = filteredLazy
pc.allCache = allLazy
pc.cacheLock.Unlock()
}

func (pc *cacheForMiner) dump(filtered bool) map[common.Address][]*txpool.LazyTransaction {
pc.cacheLock.Lock()
pending := pc.allCache
if filtered {
pending = pc.filteredCache
}
pc.cacheLock.Unlock()
return pending

//pendingLazy := make(map[common.Address][]*txpool.LazyTransaction)
//var txnum = 0
//for addr, txs := range pending {
// // If the miner requests tip enforcement, cap the lists now
// if enforceTip && !pc.IsLocal(addr) {
// for i, tx := range txs {
// if tx.Tx.EffectiveGasTipIntCmp(gasPrice, baseFee) < 0 {
// txs = txs[:i]
// break
// }
// }
// }
// if len(txs) > 0 {
// lazies := make([]*txpool.LazyTransaction, len(txs))
// for i, tx := range txs {
// lazies[i] = tx
// txnum++
// }
// pendingLazy[addr] = lazies
// }
//}
//log.Info("cacheForMiner dump", "duration", time.Since(start), "accounts", len(pending), "txs", txnum)
//return pendingLazy
}

func (pc *cacheForMiner) markLocal(addr common.Address) {
Expand All @@ -91,7 +160,7 @@ func (pc *cacheForMiner) markLocal(addr common.Address) {
pc.locals[addr] = true
}

func (pc *cacheForMiner) isLocal(addr common.Address) bool {
func (pc *cacheForMiner) IsLocal(addr common.Address) bool {
pc.addrLock.Lock()
defer pc.addrLock.Unlock()
return pc.locals[addr]
Expand Down
79 changes: 41 additions & 38 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A
// Set the basic pool parameters
pool.gasTip.Store(uint256.NewInt(gasTip))

// set dumper
pool.pendingCache.sync2cache(pool, pool.createFilter(pool.gasTip.Load().ToBig(), head.BaseFee))

// Initialize the state with head block, or fallback to empty one in
// case the head state is not available (might occur when node is not
// fully synced).
Expand Down Expand Up @@ -383,9 +386,27 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A
}
pool.wg.Add(1)
go pool.loop()
go pool.loopOfSync()
return nil
}

func (pool *LegacyPool) loopOfSync() {
ticker := time.NewTicker(200 * time.Millisecond)
for {
select {
case <-pool.reorgShutdownCh:
return
case <-ticker.C:
gasTip := pool.gasTip.Load()
currHead := pool.currentHead.Load()
if gasTip == nil || currHead == nil {
continue
}
pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), currHead.BaseFee))
}
}
}

// loop is the transaction pool's main event loop, waiting for and reacting to
// outside blockchain events as well as for various reporting and transaction
// eviction events.
Expand Down Expand Up @@ -624,57 +645,35 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
// TODO need to confirm
defer func(t0 time.Time) {
getPendingDurationTimer.Update(time.Since(t0))
}(time.Now())
empty := txpool.PendingFilter{}
if filter == empty {
// return all pending transactions, no filtering
return pool.pendingCache.dump(false)
}
// If only blob transactions are requested, this pool is unsuitable as it
// contains none, don't even bother.
if filter.OnlyBlobTxs {
return nil
}
defer func(t0 time.Time) {
getPendingDurationTimer.Update(time.Since(t0))
}(time.Now())
// It is a bit tricky here, we don't do the filtering here.
return pool.pendingCache.dump(true)
}

// Convert the new uint256.Int types to the old big.Int ones used by the legacy pool
var (
minTipBig *big.Int
baseFeeBig *big.Int
)
if filter.MinTip != nil {
minTipBig = filter.MinTip.ToBig()
}
if filter.BaseFee != nil {
baseFeeBig = filter.BaseFee.ToBig()
}
pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
for addr, txs := range pool.pendingCache.dump() {

// If the miner requests tip enforcement, cap the lists now
if minTipBig != nil && !pool.locals.contains(addr) {
func (pool *LegacyPool) createFilter(gasPrice, baseFee *big.Int) func(txs types.Transactions, addr common.Address) types.Transactions {
return func(txs types.Transactions, addr common.Address) types.Transactions {
if !pool.pendingCache.IsLocal(addr) {
for i, tx := range txs {
if tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 {
if tx.EffectiveGasTipIntCmp(gasPrice, baseFee) < 0 {
txs = txs[:i]
break
}
}
}
if len(txs) > 0 {
lazies := make([]*txpool.LazyTransaction, len(txs))
for i := 0; i < len(txs); i++ {
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: txs[i].Hash(),
Tx: txs[i],
Time: txs[i].Time(),
GasFeeCap: uint256.MustFromBig(txs[i].GasFeeCap()),
GasTipCap: uint256.MustFromBig(txs[i].GasTipCap()),
Gas: txs[i].Gas(),
BlobGas: txs[i].BlobGas(),
}
}
pending[addr] = lazies
}
return txs
}
return pending
}

// Locals retrieves the accounts currently considered local by the pool.
Expand Down Expand Up @@ -1470,6 +1469,10 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
pool.priced.SetBaseFee(pendingBaseFee)
}
}
gasTip, baseFee := pool.gasTip.Load(), pendingBaseFee
go func() {
pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), baseFee))
}()
// Update all accounts to the latest known pending nonce
nonces := make(map[common.Address]uint64, len(pool.pending))
for addr, list := range pool.pending {
Expand Down
2 changes: 2 additions & 0 deletions ethclient/simulated/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ func (n *Backend) Close() error {

// Commit seals a block and moves the chain forward to a new empty block.
func (n *Backend) Commit() common.Hash {
// wait for the transactions to be sync into cache
time.Sleep(350 * time.Millisecond)
return n.beacon.Commit()
}

Expand Down
1 change: 1 addition & 0 deletions ethclient/simulated/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func TestForkResendTx(t *testing.T) {
t.Fatalf("could not create transaction: %v", err)
}
client.SendTransaction(ctx, tx)
time.Sleep(1 * time.Second)
sim.Commit()

// 3.
Expand Down
2 changes: 1 addition & 1 deletion miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1197,7 +1197,7 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err
pendingBlobTxs := w.eth.TxPool().Pending(filter)

packFromTxpoolTimer.UpdateSince(start)
log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash())
log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash(), "txs", len(pendingPlainTxs))

// Split the pending transactions into locals and remotes.
localPlainTxs, remotePlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs
Expand Down
Loading