Skip to content

Commit

Permalink
[R4R]reannounce local pending transactions (#570)
Browse files Browse the repository at this point in the history
* reannouce local pending transactions

* add tests for tx_pool reannouce local pending transactions

* add tests for handler reannounce local pending transactions
  • Loading branch information
keefel authored Nov 25, 2021
1 parent 1d94308 commit fbc52de
Show file tree
Hide file tree
Showing 11 changed files with 259 additions and 15 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ var (
utils.TxPoolAccountQueueFlag,
utils.TxPoolGlobalQueueFlag,
utils.TxPoolLifetimeFlag,
utils.TxPoolReannounceTimeFlag,
utils.SyncModeFlag,
utils.ExitWhenSyncedFlag,
utils.GCModeFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.TxPoolAccountQueueFlag,
utils.TxPoolGlobalQueueFlag,
utils.TxPoolLifetimeFlag,
utils.TxPoolReannounceTimeFlag,
},
},
{
Expand Down
8 changes: 8 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,11 @@ var (
Usage: "Maximum amount of time non-executable transaction are queued",
Value: ethconfig.Defaults.TxPool.Lifetime,
}
TxPoolReannounceTimeFlag = cli.DurationFlag{
Name: "txpool.reannouncetime",
Usage: "Duration for announcing local pending transactions again (default = 10 years, minimum = 1 minute)",
Value: ethconfig.Defaults.TxPool.ReannounceTime,
}
// Performance tuning settings
CacheFlag = cli.IntFlag{
Name: "cache",
Expand Down Expand Up @@ -1410,6 +1415,9 @@ func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) {
if ctx.GlobalIsSet(TxPoolLifetimeFlag.Name) {
cfg.Lifetime = ctx.GlobalDuration(TxPoolLifetimeFlag.Name)
}
if ctx.GlobalIsSet(TxPoolReannounceTimeFlag.Name) {
cfg.ReannounceTime = ctx.GlobalDuration(TxPoolReannounceTimeFlag.Name)
}
}

func setEthash(ctx *cli.Context, cfg *ethconfig.Config) {
Expand Down
3 changes: 3 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
type NewTxsEvent struct{ Txs []*types.Transaction }

// ReannoTxsEvent is posted when a batch of local pending transactions exceed a specified duration.
type ReannoTxsEvent struct{ Txs []*types.Transaction }

// NewMinedBlockEvent is posted when a block has been imported.
type NewMinedBlockEvent struct{ Block *types.Block }

Expand Down
72 changes: 59 additions & 13 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ const (
// more expensive to propagate; larger transactions also take more resources
// to validate whether they fit into the pool or not.
txMaxSize = 4 * txSlotSize // 128KB

// txReannoMaxNum is the maximum number of transactions a reannounce action can include.
txReannoMaxNum = 1024
)

var (
Expand Down Expand Up @@ -88,6 +91,7 @@ var (
var (
evictionInterval = time.Minute // Time interval to check for evictable transactions
statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats
reannounceInterval = time.Minute // Time interval to check for reannounce transactions
)

var (
Expand Down Expand Up @@ -152,7 +156,8 @@ type TxPoolConfig struct {
AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts

Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
ReannounceTime time.Duration // Duration for announcing local pending transactions again
}

// DefaultTxPoolConfig contains the default configurations for the transaction
Expand All @@ -169,7 +174,8 @@ var DefaultTxPoolConfig = TxPoolConfig{
AccountQueue: 64,
GlobalQueue: 1024,

Lifetime: 3 * time.Hour,
Lifetime: 3 * time.Hour,
ReannounceTime: 10 * 365 * 24 * time.Hour,
}

// sanitize checks the provided user configurations and changes anything that's
Expand Down Expand Up @@ -208,6 +214,10 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
log.Warn("Sanitizing invalid txpool lifetime", "provided", conf.Lifetime, "updated", DefaultTxPoolConfig.Lifetime)
conf.Lifetime = DefaultTxPoolConfig.Lifetime
}
if conf.ReannounceTime < time.Minute {
log.Warn("Sanitizing invalid txpool reannounce time", "provided", conf.ReannounceTime, "updated", time.Minute)
conf.ReannounceTime = time.Minute
}
return conf
}

Expand All @@ -219,14 +229,15 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
reannoTxFeed event.Feed // Event feed for announcing transactions again
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex

istanbul bool // Fork indicator whether we are in the istanbul stage.
eip2718 bool // Fork indicator whether we are using EIP-2718 type transactions.
Expand Down Expand Up @@ -323,14 +334,16 @@ func (pool *TxPool) loop() {
var (
prevPending, prevQueued, prevStales int
// Start the stats reporting and transaction eviction tickers
report = time.NewTicker(statsReportInterval)
evict = time.NewTicker(evictionInterval)
journal = time.NewTicker(pool.config.Rejournal)
report = time.NewTicker(statsReportInterval)
evict = time.NewTicker(evictionInterval)
reannounce = time.NewTicker(reannounceInterval)
journal = time.NewTicker(pool.config.Rejournal)
// Track the previous head headers for transaction reorgs
head = pool.chain.CurrentBlock()
)
defer report.Stop()
defer evict.Stop()
defer reannounce.Stop()
defer journal.Stop()

for {
Expand Down Expand Up @@ -378,6 +391,33 @@ func (pool *TxPool) loop() {
}
pool.mu.Unlock()

case <-reannounce.C:
pool.mu.RLock()
reannoTxs := func() []*types.Transaction {
txs := make([]*types.Transaction, 0)
for addr, list := range pool.pending {
if !pool.locals.contains(addr) {
continue
}

for _, tx := range list.Flatten() {
// Default ReannounceTime is 10 years, won't announce by default.
if time.Since(tx.Time()) < pool.config.ReannounceTime {
break
}
txs = append(txs, tx)
if len(txs) >= txReannoMaxNum {
return txs
}
}
}
return txs
}()
pool.mu.RUnlock()
if len(reannoTxs) > 0 {
pool.reannoTxFeed.Send(ReannoTxsEvent{reannoTxs})
}

// Handle local transaction journal rotation
case <-journal.C:
if pool.journal != nil {
Expand Down Expand Up @@ -412,6 +452,12 @@ func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscripti
return pool.scope.Track(pool.txFeed.Subscribe(ch))
}

// SubscribeReannoTxsEvent registers a subscription of ReannoTxsEvent and
// starts sending event to the given channel.
func (pool *TxPool) SubscribeReannoTxsEvent(ch chan<- ReannoTxsEvent) event.Subscription {
return pool.scope.Track(pool.reannoTxFeed.Subscribe(ch))
}

// GasPrice returns the current gas price enforced by the transaction pool.
func (pool *TxPool) GasPrice() *big.Int {
pool.mu.RLock()
Expand Down
41 changes: 41 additions & 0 deletions core/tx_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1933,6 +1933,47 @@ func TestTransactionSlotCount(t *testing.T) {
}
}

// Tests the local pending transaction announced again correctly.
func TestTransactionPendingReannouce(t *testing.T) {
t.Parallel()

// Create the pool to test the limit enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}

config := testTxPoolConfig
// This ReannounceTime will be modified to time.Minute when creating tx_pool.
config.ReannounceTime = time.Second
reannounceInterval = time.Second

pool := NewTxPool(config, params.TestChainConfig, blockchain)
// Modify ReannounceTime to trigger quicker.
pool.config.ReannounceTime = time.Second
defer pool.Stop()

key, _ := crypto.GenerateKey()
account := crypto.PubkeyToAddress(key.PublicKey)
pool.currentState.AddBalance(account, big.NewInt(1000000))

events := make(chan ReannoTxsEvent, testTxPoolConfig.AccountQueue)
sub := pool.reannoTxFeed.Subscribe(events)
defer sub.Unsubscribe()

// Generate a batch of transactions and add to tx_pool locally.
txs := make([]*types.Transaction, 0, testTxPoolConfig.AccountQueue)
for i := uint64(0); i < testTxPoolConfig.AccountQueue; i++ {
txs = append(txs, transaction(i, 100000, key))
}
pool.AddLocals(txs)

select {
case ev := <-events:
t.Logf("received reannouce event, txs length: %d", len(ev.Txs))
case <-time.After(5 * time.Second):
t.Errorf("reannouce event not fired")
}
}

// Benchmarks the speed of validating the contents of the pending queue of the
// transaction pool.
func BenchmarkPendingDemotion100(b *testing.B) { benchmarkPendingDemotion(b, 100) }
Expand Down
5 changes: 5 additions & 0 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ type TxData interface {
setSignatureValues(chainID, v, r, s *big.Int)
}

// Time returns transaction's time
func (tx *Transaction) Time() time.Time {
return tx.time
}

// EncodeRLP implements rlp.Encoder
func (tx *Transaction) EncodeRLP(w io.Writer) error {
if tx.Type() == LegacyTxType {
Expand Down
51 changes: 51 additions & 0 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ type txPool interface {
// SubscribeNewTxsEvent should return an event subscription of
// NewTxsEvent and send events to the given channel.
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription

// SubscribeReannoTxsEvent should return an event subscription of
// ReannoTxsEvent and send events to the given channel.
SubscribeReannoTxsEvent(chan<- core.ReannoTxsEvent) event.Subscription
}

// handlerConfig is the collection of initialization parameters to create a full
Expand Down Expand Up @@ -120,6 +124,8 @@ type handler struct {
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
reannoTxsCh chan core.ReannoTxsEvent
reannoTxsSub event.Subscription
minedBlockSub *event.TypeMuxSubscription

whitelist map[uint64]common.Hash
Expand Down Expand Up @@ -432,6 +438,12 @@ func (h *handler) Start(maxPeers int) {
h.txsSub = h.txpool.SubscribeNewTxsEvent(h.txsCh)
go h.txBroadcastLoop()

// announce local pending transactions again
h.wg.Add(1)
h.reannoTxsCh = make(chan core.ReannoTxsEvent, txChanSize)
h.reannoTxsSub = h.txpool.SubscribeReannoTxsEvent(h.reannoTxsCh)
go h.txReannounceLoop()

// broadcast mined blocks
h.wg.Add(1)
h.minedBlockSub = h.eventMux.Subscribe(core.NewMinedBlockEvent{})
Expand All @@ -445,6 +457,7 @@ func (h *handler) Start(maxPeers int) {

func (h *handler) Stop() {
h.txsSub.Unsubscribe() // quits txBroadcastLoop
h.reannoTxsSub.Unsubscribe() // quits txReannounceLoop
h.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop

// Quit chainSync and txsync64.
Expand Down Expand Up @@ -549,6 +562,31 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
"tx packs", directPeers, "broadcast txs", directCount)
}

// ReannounceTransactions will announce a batch of local pending transactions
// to a square root of all peers.
func (h *handler) ReannounceTransactions(txs types.Transactions) {
var (
annoCount int // Count of announcements made
annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce
)

// Announce transactions hash to a batch of peers
peersCount := uint(math.Sqrt(float64(h.peers.len())))
peers := h.peers.headPeers(peersCount)
for _, tx := range txs {
for _, peer := range peers {
annos[peer] = append(annos[peer], tx.Hash())
}
}

for peer, hashes := range annos {
annoCount += len(hashes)
peer.AsyncSendPooledTransactionHashes(hashes)
}
log.Debug("Transaction reannounce", "txs", len(txs),
"announce packs", peersCount, "announced hashes", annoCount)
}

// minedBroadcastLoop sends mined blocks to connected peers.
func (h *handler) minedBroadcastLoop() {
defer h.wg.Done()
Expand All @@ -573,3 +611,16 @@ func (h *handler) txBroadcastLoop() {
}
}
}

// txReannounceLoop announces local pending transactions to connected peers again.
func (h *handler) txReannounceLoop() {
defer h.wg.Done()
for {
select {
case event := <-h.reannoTxsCh:
h.ReannounceTransactions(event.Txs)
case <-h.reannoTxsSub.Err():
return
}
}
}
53 changes: 53 additions & 0 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,59 @@ func testTransactionPropagation(t *testing.T, protocol uint) {
}
}

// Tests that local pending transactions get propagated to peers.
func TestTransactionPendingReannounce(t *testing.T) {
t.Parallel()

// Create a source handler to announce transactions from and a sink handler
// to receive them.
source := newTestHandler()
defer source.close()

sink := newTestHandler()
defer sink.close()
sink.handler.acceptTxs = 1 // mark synced to accept transactions

sourcePipe, sinkPipe := p2p.MsgPipe()
defer sourcePipe.Close()
defer sinkPipe.Close()

sourcePeer := eth.NewPeer(eth.ETH65, p2p.NewPeer(enode.ID{0}, "", nil), sourcePipe, source.txpool)
sinkPeer := eth.NewPeer(eth.ETH65, p2p.NewPeer(enode.ID{0}, "", nil), sinkPipe, sink.txpool)
defer sourcePeer.Close()
defer sinkPeer.Close()

go source.handler.runEthPeer(sourcePeer, func(peer *eth.Peer) error {
return eth.Handle((*ethHandler)(source.handler), peer)
})
go sink.handler.runEthPeer(sinkPeer, func(peer *eth.Peer) error {
return eth.Handle((*ethHandler)(sink.handler), peer)
})

// Subscribe transaction pools
txCh := make(chan core.NewTxsEvent, 1024)
sub := sink.txpool.SubscribeNewTxsEvent(txCh)
defer sub.Unsubscribe()

txs := make([]*types.Transaction, 64)
for nonce := range txs {
tx := types.NewTransaction(uint64(nonce), common.Address{}, big.NewInt(0), 100000, big.NewInt(0), nil)
tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey)

txs[nonce] = tx
}
source.txpool.ReannouceTransactions(txs)

for arrived := 0; arrived < len(txs); {
select {
case event := <-txCh:
arrived += len(event.Txs)
case <-time.NewTimer(time.Second).C:
t.Errorf("sink: transaction propagation timed out: have %d, want %d", arrived, len(txs))
}
}
}

// Tests that post eth protocol handshake, clients perform a mutual checkpoint
// challenge to validate each other's chains. Hash mismatches, or missing ones
// during a fast sync should lead to the peer getting dropped.
Expand Down
Loading

0 comments on commit fbc52de

Please sign in to comment.