Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R]reannounce local pending transactions #570

Merged
merged 3 commits into from
Nov 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ var (
utils.TxPoolAccountQueueFlag,
utils.TxPoolGlobalQueueFlag,
utils.TxPoolLifetimeFlag,
utils.TxPoolReannounceTimeFlag,
utils.SyncModeFlag,
utils.ExitWhenSyncedFlag,
utils.GCModeFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.TxPoolAccountQueueFlag,
utils.TxPoolGlobalQueueFlag,
utils.TxPoolLifetimeFlag,
utils.TxPoolReannounceTimeFlag,
},
},
{
Expand Down
8 changes: 8 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,11 @@ var (
Usage: "Maximum amount of time non-executable transaction are queued",
Value: ethconfig.Defaults.TxPool.Lifetime,
}
TxPoolReannounceTimeFlag = cli.DurationFlag{
Name: "txpool.reannouncetime",
Usage: "Duration for announcing local pending transactions again (default = 10 years, minimum = 1 minute)",
Value: ethconfig.Defaults.TxPool.ReannounceTime,
}
// Performance tuning settings
CacheFlag = cli.IntFlag{
Name: "cache",
Expand Down Expand Up @@ -1410,6 +1415,9 @@ func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) {
if ctx.GlobalIsSet(TxPoolLifetimeFlag.Name) {
cfg.Lifetime = ctx.GlobalDuration(TxPoolLifetimeFlag.Name)
}
if ctx.GlobalIsSet(TxPoolReannounceTimeFlag.Name) {
cfg.ReannounceTime = ctx.GlobalDuration(TxPoolReannounceTimeFlag.Name)
}
}

func setEthash(ctx *cli.Context, cfg *ethconfig.Config) {
Expand Down
3 changes: 3 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
type NewTxsEvent struct{ Txs []*types.Transaction }

// ReannoTxsEvent is posted when a batch of local pending transactions exceed a specified duration.
type ReannoTxsEvent struct{ Txs []*types.Transaction }

// NewMinedBlockEvent is posted when a block has been imported.
type NewMinedBlockEvent struct{ Block *types.Block }

Expand Down
72 changes: 59 additions & 13 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ const (
// more expensive to propagate; larger transactions also take more resources
// to validate whether they fit into the pool or not.
txMaxSize = 4 * txSlotSize // 128KB

// txReannoMaxNum is the maximum number of transactions a reannounce action can include.
txReannoMaxNum = 1024
)

var (
Expand Down Expand Up @@ -88,6 +91,7 @@ var (
var (
evictionInterval = time.Minute // Time interval to check for evictable transactions
statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats
reannounceInterval = time.Minute // Time interval to check for reannounce transactions
)

var (
Expand Down Expand Up @@ -152,7 +156,8 @@ type TxPoolConfig struct {
AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts

Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
ReannounceTime time.Duration // Duration for announcing local pending transactions again
}

// DefaultTxPoolConfig contains the default configurations for the transaction
Expand All @@ -169,7 +174,8 @@ var DefaultTxPoolConfig = TxPoolConfig{
AccountQueue: 64,
GlobalQueue: 1024,

Lifetime: 3 * time.Hour,
Lifetime: 3 * time.Hour,
ReannounceTime: 10 * 365 * 24 * time.Hour,
}

// sanitize checks the provided user configurations and changes anything that's
Expand Down Expand Up @@ -208,6 +214,10 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
log.Warn("Sanitizing invalid txpool lifetime", "provided", conf.Lifetime, "updated", DefaultTxPoolConfig.Lifetime)
conf.Lifetime = DefaultTxPoolConfig.Lifetime
}
if conf.ReannounceTime < time.Minute {
j75689 marked this conversation as resolved.
Show resolved Hide resolved
log.Warn("Sanitizing invalid txpool reannounce time", "provided", conf.ReannounceTime, "updated", time.Minute)
conf.ReannounceTime = time.Minute
}
return conf
}

Expand All @@ -219,14 +229,15 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
reannoTxFeed event.Feed // Event feed for announcing transactions again
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex

istanbul bool // Fork indicator whether we are in the istanbul stage.
eip2718 bool // Fork indicator whether we are using EIP-2718 type transactions.
Expand Down Expand Up @@ -323,14 +334,16 @@ func (pool *TxPool) loop() {
var (
prevPending, prevQueued, prevStales int
// Start the stats reporting and transaction eviction tickers
report = time.NewTicker(statsReportInterval)
evict = time.NewTicker(evictionInterval)
journal = time.NewTicker(pool.config.Rejournal)
report = time.NewTicker(statsReportInterval)
evict = time.NewTicker(evictionInterval)
reannounce = time.NewTicker(reannounceInterval)
j75689 marked this conversation as resolved.
Show resolved Hide resolved
journal = time.NewTicker(pool.config.Rejournal)
// Track the previous head headers for transaction reorgs
head = pool.chain.CurrentBlock()
)
defer report.Stop()
defer evict.Stop()
defer reannounce.Stop()
defer journal.Stop()

for {
Expand Down Expand Up @@ -378,6 +391,33 @@ func (pool *TxPool) loop() {
}
pool.mu.Unlock()

case <-reannounce.C:
pool.mu.RLock()
j75689 marked this conversation as resolved.
Show resolved Hide resolved
reannoTxs := func() []*types.Transaction {
txs := make([]*types.Transaction, 0)
for addr, list := range pool.pending {
if !pool.locals.contains(addr) {
continue
}

for _, tx := range list.Flatten() {
// Default ReannounceTime is 10 years, won't announce by default.
if time.Since(tx.Time()) < pool.config.ReannounceTime {
break
}
txs = append(txs, tx)
if len(txs) >= txReannoMaxNum {
return txs
}
}
}
return txs
}()
pool.mu.RUnlock()
if len(reannoTxs) > 0 {
pool.reannoTxFeed.Send(ReannoTxsEvent{reannoTxs})
}

// Handle local transaction journal rotation
case <-journal.C:
if pool.journal != nil {
Expand Down Expand Up @@ -412,6 +452,12 @@ func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscripti
return pool.scope.Track(pool.txFeed.Subscribe(ch))
}

// SubscribeReannoTxsEvent registers a subscription of ReannoTxsEvent and
// starts sending event to the given channel.
func (pool *TxPool) SubscribeReannoTxsEvent(ch chan<- ReannoTxsEvent) event.Subscription {
return pool.scope.Track(pool.reannoTxFeed.Subscribe(ch))
}

// GasPrice returns the current gas price enforced by the transaction pool.
func (pool *TxPool) GasPrice() *big.Int {
pool.mu.RLock()
Expand Down
41 changes: 41 additions & 0 deletions core/tx_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1933,6 +1933,47 @@ func TestTransactionSlotCount(t *testing.T) {
}
}

// Tests the local pending transaction announced again correctly.
func TestTransactionPendingReannouce(t *testing.T) {
t.Parallel()

// Create the pool to test the limit enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}

config := testTxPoolConfig
// This ReannounceTime will be modified to time.Minute when creating tx_pool.
config.ReannounceTime = time.Second
reannounceInterval = time.Second

pool := NewTxPool(config, params.TestChainConfig, blockchain)
// Modify ReannounceTime to trigger quicker.
pool.config.ReannounceTime = time.Second
defer pool.Stop()

key, _ := crypto.GenerateKey()
account := crypto.PubkeyToAddress(key.PublicKey)
pool.currentState.AddBalance(account, big.NewInt(1000000))

events := make(chan ReannoTxsEvent, testTxPoolConfig.AccountQueue)
sub := pool.reannoTxFeed.Subscribe(events)
defer sub.Unsubscribe()

// Generate a batch of transactions and add to tx_pool locally.
txs := make([]*types.Transaction, 0, testTxPoolConfig.AccountQueue)
for i := uint64(0); i < testTxPoolConfig.AccountQueue; i++ {
txs = append(txs, transaction(i, 100000, key))
}
pool.AddLocals(txs)

select {
case ev := <-events:
t.Logf("received reannouce event, txs length: %d", len(ev.Txs))
case <-time.After(5 * time.Second):
t.Errorf("reannouce event not fired")
}
}

// Benchmarks the speed of validating the contents of the pending queue of the
// transaction pool.
func BenchmarkPendingDemotion100(b *testing.B) { benchmarkPendingDemotion(b, 100) }
Expand Down
5 changes: 5 additions & 0 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ type TxData interface {
setSignatureValues(chainID, v, r, s *big.Int)
}

// Time returns transaction's time
func (tx *Transaction) Time() time.Time {
return tx.time
}

// EncodeRLP implements rlp.Encoder
func (tx *Transaction) EncodeRLP(w io.Writer) error {
if tx.Type() == LegacyTxType {
Expand Down
51 changes: 51 additions & 0 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ type txPool interface {
// SubscribeNewTxsEvent should return an event subscription of
// NewTxsEvent and send events to the given channel.
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription

// SubscribeReannoTxsEvent should return an event subscription of
// ReannoTxsEvent and send events to the given channel.
SubscribeReannoTxsEvent(chan<- core.ReannoTxsEvent) event.Subscription
}

// handlerConfig is the collection of initialization parameters to create a full
Expand Down Expand Up @@ -120,6 +124,8 @@ type handler struct {
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
reannoTxsCh chan core.ReannoTxsEvent
reannoTxsSub event.Subscription
minedBlockSub *event.TypeMuxSubscription

whitelist map[uint64]common.Hash
Expand Down Expand Up @@ -432,6 +438,12 @@ func (h *handler) Start(maxPeers int) {
h.txsSub = h.txpool.SubscribeNewTxsEvent(h.txsCh)
go h.txBroadcastLoop()

// announce local pending transactions again
h.wg.Add(1)
h.reannoTxsCh = make(chan core.ReannoTxsEvent, txChanSize)
h.reannoTxsSub = h.txpool.SubscribeReannoTxsEvent(h.reannoTxsCh)
go h.txReannounceLoop()

// broadcast mined blocks
h.wg.Add(1)
h.minedBlockSub = h.eventMux.Subscribe(core.NewMinedBlockEvent{})
Expand All @@ -445,6 +457,7 @@ func (h *handler) Start(maxPeers int) {

func (h *handler) Stop() {
h.txsSub.Unsubscribe() // quits txBroadcastLoop
h.reannoTxsSub.Unsubscribe() // quits txReannounceLoop
h.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop

// Quit chainSync and txsync64.
Expand Down Expand Up @@ -549,6 +562,31 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
"tx packs", directPeers, "broadcast txs", directCount)
}

// ReannounceTransactions will announce a batch of local pending transactions
// to a square root of all peers.
func (h *handler) ReannounceTransactions(txs types.Transactions) {
var (
annoCount int // Count of announcements made
annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce
)

// Announce transactions hash to a batch of peers
peersCount := uint(math.Sqrt(float64(h.peers.len())))
peers := h.peers.headPeers(peersCount)
for _, tx := range txs {
for _, peer := range peers {
annos[peer] = append(annos[peer], tx.Hash())
}
}

for peer, hashes := range annos {
annoCount += len(hashes)
peer.AsyncSendPooledTransactionHashes(hashes)
}
log.Debug("Transaction reannounce", "txs", len(txs),
"announce packs", peersCount, "announced hashes", annoCount)
}

// minedBroadcastLoop sends mined blocks to connected peers.
func (h *handler) minedBroadcastLoop() {
defer h.wg.Done()
Expand All @@ -573,3 +611,16 @@ func (h *handler) txBroadcastLoop() {
}
}
}

// txReannounceLoop announces local pending transactions to connected peers again.
func (h *handler) txReannounceLoop() {
defer h.wg.Done()
for {
select {
case event := <-h.reannoTxsCh:
h.ReannounceTransactions(event.Txs)
case <-h.reannoTxsSub.Err():
return
}
}
}
53 changes: 53 additions & 0 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,59 @@ func testTransactionPropagation(t *testing.T, protocol uint) {
}
}

// Tests that local pending transactions get propagated to peers.
func TestTransactionPendingReannounce(t *testing.T) {
t.Parallel()

// Create a source handler to announce transactions from and a sink handler
// to receive them.
source := newTestHandler()
defer source.close()

sink := newTestHandler()
defer sink.close()
sink.handler.acceptTxs = 1 // mark synced to accept transactions

sourcePipe, sinkPipe := p2p.MsgPipe()
defer sourcePipe.Close()
defer sinkPipe.Close()

sourcePeer := eth.NewPeer(eth.ETH65, p2p.NewPeer(enode.ID{0}, "", nil), sourcePipe, source.txpool)
sinkPeer := eth.NewPeer(eth.ETH65, p2p.NewPeer(enode.ID{0}, "", nil), sinkPipe, sink.txpool)
defer sourcePeer.Close()
defer sinkPeer.Close()

go source.handler.runEthPeer(sourcePeer, func(peer *eth.Peer) error {
return eth.Handle((*ethHandler)(source.handler), peer)
})
go sink.handler.runEthPeer(sinkPeer, func(peer *eth.Peer) error {
return eth.Handle((*ethHandler)(sink.handler), peer)
})

// Subscribe transaction pools
txCh := make(chan core.NewTxsEvent, 1024)
sub := sink.txpool.SubscribeNewTxsEvent(txCh)
defer sub.Unsubscribe()

txs := make([]*types.Transaction, 64)
for nonce := range txs {
tx := types.NewTransaction(uint64(nonce), common.Address{}, big.NewInt(0), 100000, big.NewInt(0), nil)
tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey)

txs[nonce] = tx
}
source.txpool.ReannouceTransactions(txs)

for arrived := 0; arrived < len(txs); {
select {
case event := <-txCh:
arrived += len(event.Txs)
case <-time.NewTimer(time.Second).C:
t.Errorf("sink: transaction propagation timed out: have %d, want %d", arrived, len(txs))
}
}
}

// Tests that post eth protocol handshake, clients perform a mutual checkpoint
// challenge to validate each other's chains. Hash mismatches, or missing ones
// during a fast sync should lead to the peer getting dropped.
Expand Down
Loading