Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ var (
utils.TxPoolNoLocalsFlag,
utils.TxPoolJournalFlag,
utils.TxPoolRejournalFlag,
utils.TxPoolBroadcastPendingLocalTxFlag,
utils.TxPoolPriceLimitFlag,
utils.TxPoolPriceBumpFlag,
utils.TxPoolAccountSlotsFlag,
Expand Down
15 changes: 11 additions & 4 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,15 @@ var (
Value: ethconfig.Defaults.TxPool.Rejournal,
Category: flags.TxPoolCategory,
}
TxPoolBroadcastPendingLocalTxFlag = &cli.DurationFlag{
Name: "txpool.broadcastpendinglocaltx",
Usage: "Time interval to broadcast the pending local transaction",
Value: legacypool.DefaultConfig.BroadcastPendingLocalTx,
}
TxPoolPriceLimitFlag = &cli.Uint64Flag{
Name: "txpool.pricelimit",
Usage: "Minimum gas price tip to enforce for acceptance into the pool",
Value: ethconfig.Defaults.TxPool.PriceLimit,
Category: flags.TxPoolCategory,
Name: "txpool.pricelimit",
Usage: "Minimum gas price tip to enforce for acceptance into the pool",
Value: ethconfig.Defaults.TxPool.PriceLimit,
}
TxPoolPriceBumpFlag = &cli.Uint64Flag{
Name: "txpool.pricebump",
Expand Down Expand Up @@ -1443,6 +1447,9 @@ func setTxPool(ctx *cli.Context, cfg *legacypool.Config) {
if ctx.IsSet(TxPoolRejournalFlag.Name) {
cfg.Rejournal = ctx.Duration(TxPoolRejournalFlag.Name)
}
if ctx.IsSet(TxPoolBroadcastPendingLocalTxFlag.Name) {
cfg.BroadcastPendingLocalTx = ctx.Duration(TxPoolBroadcastPendingLocalTxFlag.Name)
}
if ctx.IsSet(TxPoolPriceLimitFlag.Name) {
cfg.PriceLimit = ctx.Uint64(TxPoolPriceLimitFlag.Name)
}
Expand Down
3 changes: 3 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
type NewTxsEvent struct{ Txs []*types.Transaction }

// PendingLocalTxsEvent is posted when there are pending local transactions in the transaction pool.
type PendingLocalTxsEvent struct{ Txs []*types.Transaction }

// NewQueuedTxsEvent is posted when a batch of transactions enter the transaction pool.
type NewQueuedTxsEvent struct{ Txs []*types.Transaction }

Expand Down
14 changes: 8 additions & 6 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,11 @@ type BlockChain interface {

// Config are the configuration parameters of the transaction pool.
type Config struct {
Locals []common.Address // Addresses that should be treated by default as local
NoLocals bool // Whether local transaction handling should be disabled
Journal string // Journal of local transactions to survive node restarts
Rejournal time.Duration // Time interval to regenerate the local transaction journal
Locals []common.Address // Addresses that should be treated by default as local
NoLocals bool // Whether local transaction handling should be disabled
Journal string // Journal of local transactions to survive node restarts
Rejournal time.Duration // Time interval to regenerate the local transaction journal
BroadcastPendingLocalTx time.Duration // Time interval to broadcast the local transaction

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be removed?

Copy link
Author

@yysung1123 yysung1123 Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I followed the approach of Rejournal.
store the config value in legacypool.Config and use it in eth/backend.go for TxTracker
ref.

rejournal := config.TxPool.Rejournal


PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool
PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce)
Expand All @@ -140,8 +141,9 @@ type Config struct {

// DefaultConfig contains the default configurations for the transaction pool.
var DefaultConfig = Config{
Journal: "transactions.rlp",
Rejournal: time.Hour,
Journal: "transactions.rlp",
Rejournal: time.Hour,
BroadcastPendingLocalTx: 5 * time.Minute,

PriceLimit: 1,
PriceBump: 10,
Expand Down
40 changes: 35 additions & 5 deletions core/txpool/locals/tx_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
Expand All @@ -46,18 +48,22 @@ type TxTracker struct {
all map[common.Hash]*types.Transaction // All tracked transactions
byAddr map[common.Address]*legacypool.SortedMap // Transactions by address

journal *journal // Journal of local transaction to back up to disk
rejournal time.Duration // How often to rotate journal
pool *txpool.TxPool // The tx pool to interact with
signer types.Signer
journal *journal // Journal of local transaction to back up to disk
rejournal time.Duration // How often to rotate journal
broadcastPendingLocalTx time.Duration // Time interval to broadcast the local transaction
pool *txpool.TxPool // The tx pool to interact with
signer types.Signer

pendingLocalTxFeed event.Feed

shutdownCh chan struct{}
mu sync.Mutex
wg sync.WaitGroup
}

// New creates a new TxTracker
func New(journalPath string, journalTime time.Duration, chainConfig *params.ChainConfig, next *txpool.TxPool) *TxTracker {
func New(journalPath string, journalTime time.Duration, chainConfig *params.ChainConfig, next *txpool.TxPool,
broadcastPendingLocalTxTime time.Duration) *TxTracker {
pool := &TxTracker{
all: make(map[common.Hash]*types.Transaction),
byAddr: make(map[common.Address]*legacypool.SortedMap),
Expand All @@ -69,6 +75,7 @@ func New(journalPath string, journalTime time.Duration, chainConfig *params.Chai
pool.journal = newTxJournal(journalPath)
pool.rejournal = journalTime
}
pool.broadcastPendingLocalTx = broadcastPendingLocalTxTime
return pool
}

Expand Down Expand Up @@ -187,6 +194,10 @@ func (tracker *TxTracker) loop() {
lastJournal = time.Now()
timer = time.NewTimer(10 * time.Second) // Do initial check after 10 seconds, do rechecks more seldom.
)

pendingLocalTxs := time.NewTicker(tracker.broadcastPendingLocalTx)
defer pendingLocalTxs.Stop()

for {
select {
case <-tracker.shutdownCh:
Expand All @@ -207,6 +218,25 @@ func (tracker *TxTracker) loop() {
tracker.mu.Unlock()
}
timer.Reset(recheckInterval)
case <-pendingLocalTxs.C:
lTxs := types.Transactions{}
for addr, lazyTxs := range tracker.pool.Pending(txpool.PendingFilter{OnlyPlainTxs: true}) {
if _, ok := tracker.byAddr[addr]; !ok {
continue
}
for _, lazyTx := range lazyTxs {
lTxs = append(lTxs, lazyTx.Tx)
}
}

if len(lTxs) > 0 {
go tracker.pendingLocalTxFeed.Send(core.PendingLocalTxsEvent{Txs: lTxs})
}
}
}
}

// SubscribePendingLocalTransactions subscribes to pending local transaction events.
func (tracker *TxTracker) SubscribePendingLocalTransactions(ch chan<- core.PendingLocalTxsEvent) event.Subscription {
return tracker.pendingLocalTxFeed.Subscribe(ch)
}
16 changes: 16 additions & 0 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ type BlockChain interface {
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
}

type PendingLocalTxsPublisher interface {
SubscribePendingLocalTransactions(ch chan<- core.PendingLocalTxsEvent) event.Subscription
}

// TxPool is an aggregator for various transaction specific pools, collectively
// tracking all the transactions deemed interesting by the node. Transactions
// enter the pool when they are received from the network or submitted locally.
Expand All @@ -76,6 +80,8 @@ type TxPool struct {
term chan struct{} // Termination channel to detect a closed pool

sync chan chan error // Testing / simulator channel to block until internal reset is done

PendingLocalTxsPublisher PendingLocalTxsPublisher // Publisher for pending local transactions
}

// New creates a new transaction pool to gather, sort and filter inbound
Expand Down Expand Up @@ -386,6 +392,16 @@ func (p *TxPool) Pending(filter PendingFilter) map[common.Address][]*LazyTransac
return txs
}

// SubscribePendingLocalTxsEvent registers a subscription of PendingLocalTxsEvent and
// starts sending event to the given channel.
func (p *TxPool) SubscribePendingLocalTxsEvent(ch chan<- core.PendingLocalTxsEvent) event.Subscription {
var subs []event.Subscription
if p.PendingLocalTxsPublisher != nil {
subs = append(subs, p.PendingLocalTxsPublisher.SubscribePendingLocalTransactions(ch))
}
return p.subs.Track(event.JoinSubscriptions(subs...))
}

// SubscribeNewQueuedTxsEvent registers a subscription of NewQueuedTxsEvent and
func (p *TxPool) SubscribeNewQueuedTxsEvent(ch chan<- core.NewQueuedTxsEvent) event.Subscription {
subs := make([]event.Subscription, len(p.subpools))
Expand Down
8 changes: 7 additions & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,14 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
log.Warn("Sanitizing invalid txpool journal time", "provided", rejournal, "updated", time.Second)
rejournal = time.Second
}
eth.localTxTracker = locals.New(config.TxPool.Journal, rejournal, eth.blockchain.Config(), eth.txPool)
broadcastPendingLocalTx := config.TxPool.BroadcastPendingLocalTx
if broadcastPendingLocalTx < time.Second {
log.Warn("Sanitizing invalid txpool broadcast local tx time", "provided", broadcastPendingLocalTx, "updated", time.Second)
broadcastPendingLocalTx = time.Second
}
eth.localTxTracker = locals.New(config.TxPool.Journal, rejournal, eth.blockchain.Config(), eth.txPool, broadcastPendingLocalTx)
stack.RegisterLifecycle(eth.localTxTracker)
eth.txPool.PendingLocalTxsPublisher = eth.localTxTracker
Copy link

@vicky-sunshine vicky-sunshine Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note on the main changes in this PR:

The key refactor is in txPool's SubscribePendingLocalTxsEvent:

  • Before: txPool called subPools.SubscribePendingLocalTxsEvent when executing SubscribePendingLocalTxsEvent.
  • After: It now calls PendingLocalTxsPublisher.SubscribePendingLocalTransactions, which is backed by localTxTracker.

Since localTxTracker is optional and the handler only has txPool (but not localTxTracker), we had to pass eth.localTxTracker into txPool at L257.

While the data flow feels a bit indirect (handler → txPool → localTxTracker, instead of handler → localTxTracker), I couldn’t immediately think of a cleaner solution without making broader changes.

Overall, I think this approach is fine, so I’ll approve this.

Reference of old customized commit: bf10d46#diff-4d08b06f78070fa573caaaa4b6b56dd5cc7bbeb03b04d1b93cafa167fc5577e5R374

Copy link
Author

@yysung1123 yysung1123 Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for organizing!

The data flow:

  • before v1.14.13:
    handler -> txPool -> LegacyPool (subpool) -> filter pending by locals (contains the addresses that submitting local tx before)
  • after v1.15.0:
    handler -> txPool -> TxTracker (PendingLocalTxsPublisher) -> query pending txs from LegacyPool and filter them by byAddr (contains the addresses that submitting local tx before)

}
// Permit the downloader to use the trie cache allowance during fast sync
cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit
Expand Down
47 changes: 44 additions & 3 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ const (
// All transactions with a higher size will be announced and need to be fetched
// by the peer.
txMaxBroadcastSize = 4096

// pendingLocalTxChanSize is the size of channel listening to NewTxsEvent.
// The number is referenced from the size of tx pool.
pendingLocalTxChanSize = 4096
)

var syncChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the sync progress challenge
Expand All @@ -78,6 +82,10 @@ type txPool interface {
// can decide whether to receive notifications only for newly seen transactions
// or also for reorged out ones.
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription

// SubscribePendingLocalTxsEvent should return an event subscription of
// NewTxsEvent and send events to the given channel.
SubscribePendingLocalTxsEvent(chan<- core.PendingLocalTxsEvent) event.Subscription
}

// handlerConfig is the collection of initialization parameters to create a full
Expand Down Expand Up @@ -111,9 +119,11 @@ type handler struct {
txFetcher *fetcher.TxFetcher
peers *peerSet

eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
pendingLocalTxsCh chan core.PendingLocalTxsEvent
pendingLocalTxsSub event.Subscription

requiredBlocks map[uint64]common.Hash

Expand Down Expand Up @@ -424,7 +434,10 @@ func (h *handler) Start(maxPeers int) {
h.wg.Add(1)
h.txsCh = make(chan core.NewTxsEvent, txChanSize)
h.txsSub = h.txpool.SubscribeTransactions(h.txsCh, false)
h.pendingLocalTxsCh = make(chan core.PendingLocalTxsEvent, pendingLocalTxChanSize)
h.pendingLocalTxsSub = h.txpool.SubscribePendingLocalTxsEvent(h.pendingLocalTxsCh)
go h.txBroadcastLoop()
go h.pendingLocalTxBroadcastLoop()

// start sync handlers
h.txFetcher.Start()
Expand All @@ -438,6 +451,7 @@ func (h *handler) Stop() {
h.txsSub.Unsubscribe() // quits txBroadcastLoop
h.txFetcher.Stop()
h.downloader.Terminate()
h.pendingLocalTxsSub.Unsubscribe()

// Quit chainSync and txsync64.
// After this is done, no new peers will be accepted.
Expand Down Expand Up @@ -531,6 +545,20 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
"bcastpeers", len(txset), "bcastcount", directCount, "annpeers", len(annos), "anncount", annCount)
}

// BroadcastPendingLocalTxs will propagate a batch of transactions to all peers
func (h *handler) BroadcastPendingLocalTxs(txs types.Transactions) {
peers := h.peers.Clone()
// Build tx hashes
txHashes := make([]common.Hash, len(txs))
for i, tx := range txs {
txHashes[i] = tx.Hash()
}
for _, peer := range peers {
peer.AsyncSendTransactions(txHashes)
}
log.Info("Broadcast pending local transaction to all peers", "recipients", len(peers))
}

// txBroadcastLoop announces new transactions to connected peers.
func (h *handler) txBroadcastLoop() {
defer h.wg.Done()
Expand All @@ -557,3 +585,16 @@ func (h *handler) enableSyncedFeatures() {
h.snapSync.Store(false)
}
}

func (h *handler) pendingLocalTxBroadcastLoop() {
for {
select {
case event := <-h.pendingLocalTxsCh:
h.BroadcastPendingLocalTxs(event.Txs)

// Err() channel will be closed when unsubscribing.
case <-h.pendingLocalTxsSub.Err():
return
}
}
}
12 changes: 8 additions & 4 deletions eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ var (
// Its goal is to get around setting up a valid statedb for the balance and nonce
// checks.
type testTxPool struct {
pool map[common.Hash]*types.Transaction // Hash map of collected transactions

txFeed event.Feed // Notification feed to allow waiting for inclusion
lock sync.RWMutex // Protects the transaction pool
pool map[common.Hash]*types.Transaction // Hash map of collected transactions
pendingLocalTxFeed event.Feed
txFeed event.Feed // Notification feed to allow waiting for inclusion
lock sync.RWMutex // Protects the transaction pool
}

// newTestTxPool creates a mock transaction pool.
Expand Down Expand Up @@ -121,6 +121,10 @@ func (p *testTxPool) Pending(filter txpool.PendingFilter) map[common.Address][]*
return pending
}

func (p *testTxPool) SubscribePendingLocalTxsEvent(ch chan<- core.PendingLocalTxsEvent) event.Subscription {
return p.pendingLocalTxFeed.Subscribe(ch)
}

// SubscribeTransactions should return an event subscription of NewTxsEvent and
// send events to the given channel.
func (p *testTxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
Expand Down
12 changes: 12 additions & 0 deletions eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,15 @@ func (p *snapPeer) info() *snapPeerInfo {
Version: p.Version(),
}
}

// Clone clones a peers
func (ps *peerSet) Clone() []*ethPeer {
ps.lock.RLock()
defer ps.lock.RUnlock()

list := make([]*ethPeer, 0, len(ps.peers))
for _, p := range ps.peers {
list = append(list, p)
}
return list
}