Skip to content

Commit 5413df1

Browse files
rjl493456442karalabe
authored andcommitted
core: fix heartbeat in txpool
core: address comment
1 parent c374447 commit 5413df1

File tree

2 files changed

+37
-90
lines changed

2 files changed

+37
-90
lines changed

core/tx_pool.go

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -232,12 +232,11 @@ type TxPool struct {
232232
locals *accountSet // Set of local transaction to exempt from eviction rules
233233
journal *txJournal // Journal of local transaction to back up to disk
234234

235-
pending map[common.Address]*txList // All currently processable transactions
236-
queue map[common.Address]*txList // Queued but non-processable transactions
237-
beats map[common.Address]time.Time // Last heartbeat from each known account
238-
queuedTs map[common.Hash]time.Time // Timestamp for when queued transactions were added
239-
all *txLookup // All transactions to allow lookups
240-
priced *txPricedList // All transactions sorted by price
235+
pending map[common.Address]*txList // All currently processable transactions
236+
queue map[common.Address]*txList // Queued but non-processable transactions
237+
beats map[common.Address]time.Time // Last heartbeat from each known account
238+
all *txLookup // All transactions to allow lookups
239+
priced *txPricedList // All transactions sorted by price
241240

242241
chainHeadCh chan ChainHeadEvent
243242
chainHeadSub event.Subscription
@@ -268,7 +267,6 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
268267
pending: make(map[common.Address]*txList),
269268
queue: make(map[common.Address]*txList),
270269
beats: make(map[common.Address]time.Time),
271-
queuedTs: make(map[common.Hash]time.Time),
272270
all: newTxLookup(),
273271
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
274272
reqResetCh: make(chan *txpoolResetRequest),
@@ -365,12 +363,11 @@ func (pool *TxPool) loop() {
365363
}
366364
// Any non-locals old enough should be removed
367365
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
368-
for _, tx := range pool.queue[addr].Flatten() {
369-
if time.Since(pool.queuedTs[tx.Hash()]) > pool.config.Lifetime {
370-
queuedEvictionMeter.Mark(1)
371-
pool.removeTx(tx.Hash(), true)
372-
}
366+
list := pool.queue[addr].Flatten()
367+
for _, tx := range list {
368+
pool.removeTx(tx.Hash(), true)
373369
}
370+
queuedEvictionMeter.Mark(int64(len(list)))
374371
}
375372
}
376373
pool.mu.Unlock()
@@ -622,9 +619,11 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
622619
pool.all.Add(tx)
623620
pool.priced.Put(tx)
624621
pool.journalTx(from, tx)
625-
pool.beats[from] = time.Now()
626622
pool.queueTxEvent(tx)
627623
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
624+
625+
// Successful promotion, bump the heartbeat
626+
pool.beats[from] = time.Now()
628627
return old != nil, nil
629628
}
630629
// New transaction isn't replacing a pending one, push into queue
@@ -665,20 +664,20 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er
665664
}
666665
// Discard any previous transaction and mark this
667666
if old != nil {
668-
old_hash := old.Hash()
669-
pool.all.Remove(old_hash)
667+
pool.all.Remove(old.Hash())
670668
pool.priced.Removed(1)
671-
delete(pool.queuedTs, old_hash)
672669
queuedReplaceMeter.Mark(1)
673670
} else {
674671
// Nothing was replaced, bump the queued counter
675672
queuedGauge.Inc(1)
676-
pool.queuedTs[hash] = time.Now()
677673
}
678674
if pool.all.Get(hash) == nil {
679675
pool.all.Add(tx)
680676
pool.priced.Put(tx)
681-
pool.queuedTs[hash] = time.Now()
677+
}
678+
// If we never record the heartbeat, do it right now.
679+
if _, exist := pool.beats[from]; !exist {
680+
pool.beats[from] = time.Now()
682681
}
683682
return old != nil, nil
684683
}
@@ -711,7 +710,6 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
711710
// An older transaction was better, discard this
712711
pool.all.Remove(hash)
713712
pool.priced.Removed(1)
714-
delete(pool.queuedTs, hash)
715713
pendingDiscardMeter.Mark(1)
716714
return false
717715
}
@@ -730,10 +728,10 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
730728
pool.priced.Put(tx)
731729
}
732730
// Set the potentially new pending nonce and notify any subsystems of the new tx
733-
pool.beats[addr] = time.Now()
734-
delete(pool.queuedTs, hash)
735731
pool.pendingNonces.set(addr, tx.Nonce()+1)
736732

733+
// Successful promotion, bump the heartbeat
734+
pool.beats[addr] = time.Now()
737735
return true
738736
}
739737

@@ -923,10 +921,10 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
923921
if removed, _ := future.Remove(tx); removed {
924922
// Reduce the queued counter
925923
queuedGauge.Dec(1)
926-
delete(pool.queuedTs, hash)
927924
}
928925
if future.Empty() {
929926
delete(pool.queue, addr)
927+
delete(pool.beats, addr)
930928
}
931929
}
932930
}
@@ -1202,15 +1200,13 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
12021200
for _, tx := range forwards {
12031201
hash := tx.Hash()
12041202
pool.all.Remove(hash)
1205-
delete(pool.queuedTs, hash)
12061203
}
12071204
log.Trace("Removed old queued transactions", "count", len(forwards))
12081205
// Drop all transactions that are too costly (low balance or out of gas)
12091206
drops, _ := list.Filter(pool.currentState.GetBalance(addr).Uint64(), pool.currentMaxGas)
12101207
for _, tx := range drops {
12111208
hash := tx.Hash()
12121209
pool.all.Remove(hash)
1213-
delete(pool.queuedTs, hash)
12141210
}
12151211
log.Trace("Removed unpayable queued transactions", "count", len(drops))
12161212
queuedNofundsMeter.Mark(int64(len(drops)))
@@ -1233,7 +1229,6 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
12331229
for _, tx := range caps {
12341230
hash := tx.Hash()
12351231
pool.all.Remove(hash)
1236-
delete(pool.queuedTs, hash)
12371232
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
12381233
}
12391234
queuedRateLimitMeter.Mark(int64(len(caps)))
@@ -1247,6 +1242,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
12471242
// Delete the entire queue entry if it became empty.
12481243
if list.Empty() {
12491244
delete(pool.queue, addr)
1245+
delete(pool.beats, addr)
12501246
}
12511247
}
12521248
return promoted
@@ -1431,7 +1427,6 @@ func (pool *TxPool) demoteUnexecutables() {
14311427
// Delete the entire pending entry if it became empty.
14321428
if list.Empty() {
14331429
delete(pool.pending, addr)
1434-
delete(pool.beats, addr)
14351430
}
14361431
}
14371432
}

core/tx_pool_test.go

Lines changed: 16 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,6 @@ func validateTxPoolInternals(pool *TxPool) error {
109109
if priced := pool.priced.items.Len() - pool.priced.stales; priced != pending+queued {
110110
return fmt.Errorf("total priced transaction count %d != %d pending + %d queued", priced, pending, queued)
111111
}
112-
if queued != len(pool.queuedTs) {
113-
return fmt.Errorf("total queued transaction count %d != %d queuedTs length", queued, len(pool.queuedTs))
114-
}
115112

116113
// Ensure the next nonce to assign is the correct one
117114
for addr, txs := range pool.pending {
@@ -948,7 +945,6 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
948945
// remove current transactions and increase nonce to prepare for a reset and cleanup
949946
statedb.SetNonce(crypto.PubkeyToAddress(remote.PublicKey), 2)
950947
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2)
951-
952948
<-pool.requestReset(nil, nil)
953949

954950
// make sure queue, pending are cleared
@@ -963,93 +959,49 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
963959
t.Fatalf("pool internal state corrupted: %v", err)
964960
}
965961

966-
if err := pool.AddLocal(pricedTransaction(2, 100000, big.NewInt(1), local)); err != nil {
967-
t.Fatalf("failed to add remote transaction: %v", err)
968-
}
962+
// Queue gapped transactions
969963
if err := pool.AddLocal(pricedTransaction(4, 100000, big.NewInt(1), local)); err != nil {
970964
t.Fatalf("failed to add remote transaction: %v", err)
971965
}
972-
if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(1), remote)); err != nil {
973-
t.Fatalf("failed to add remote transaction: %v", err)
974-
}
975966
if err := pool.addRemoteSync(pricedTransaction(4, 100000, big.NewInt(1), remote)); err != nil {
976967
t.Fatalf("failed to add remote transaction: %v", err)
977968
}
969+
time.Sleep(5 * evictionInterval) // A half lifetime pass
978970

979-
// wait a short amount of time to add an additional future queued item to test proper eviction when
980-
// pending is removed
981-
time.Sleep(2 * evictionInterval)
982-
if err := pool.addRemoteSync(pricedTransaction(5, 100000, big.NewInt(1), remote)); err != nil {
971+
// Queue executable transactions, the life cycle should be restarted.
972+
if err := pool.AddLocal(pricedTransaction(2, 100000, big.NewInt(1), local)); err != nil {
983973
t.Fatalf("failed to add remote transaction: %v", err)
984974
}
975+
if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(1), remote)); err != nil {
976+
t.Fatalf("failed to add remote transaction: %v", err)
977+
}
978+
time.Sleep(6 * evictionInterval)
985979

986-
// Make sure future queue and pending have transactions
980+
// All gapped transactions shouldn't be kicked out
987981
pending, queued = pool.Stats()
988982
if pending != 2 {
989983
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
990984
}
991-
if queued != 3 {
985+
if queued != 2 {
992986
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3)
993987
}
994988
if err := validateTxPoolInternals(pool); err != nil {
995989
t.Fatalf("pool internal state corrupted: %v", err)
996990
}
997991

998-
// Trigger a reset to make sure queued items are not evicted
999-
statedb.SetNonce(crypto.PubkeyToAddress(remote.PublicKey), 3)
1000-
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 3)
1001-
<-pool.requestReset(nil, nil)
1002-
1003-
// Wait for eviction to run
1004-
time.Sleep(evictionInterval * 2)
1005-
1006-
// a pool reset, empty pending list, or demotion of pending transactions should maintain
1007-
// queued transactions for non locals and locals alike if the lifetime duration has not passed yet
1008-
pending, queued = pool.Stats()
1009-
if pending != 0 {
1010-
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
1011-
}
1012-
if queued != 3 {
1013-
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
1014-
}
1015-
if err := validateTxPoolInternals(pool); err != nil {
1016-
t.Fatalf("pool internal state corrupted: %v", err)
1017-
}
1018-
1019-
// Wait for the lifetime to run for all transactions except the one that was added later
1020-
time.Sleep(evictionInterval * 7)
1021-
pending, queued = pool.Stats()
1022-
if pending != 0 {
1023-
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
1024-
}
1025-
if nolocals {
1026-
if queued != 1 {
1027-
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
1028-
}
1029-
} else {
1030-
if queued != 2 {
1031-
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
1032-
}
1033-
}
1034-
1035-
if err := validateTxPoolInternals(pool); err != nil {
1036-
t.Fatalf("pool internal state corrupted: %v", err)
1037-
}
1038-
1039-
// lifetime should pass for the final transaction
1040-
time.Sleep(evictionInterval * 2)
1041-
992+
// The whole life time pass after last promotion, kick out stale transactions
993+
time.Sleep(2 * config.Lifetime)
1042994
pending, queued = pool.Stats()
1043-
if pending != 0 {
1044-
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
995+
if pending != 2 {
996+
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
1045997
}
1046998
if nolocals {
1047999
if queued != 0 {
1048-
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
1000+
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
10491001
}
10501002
} else {
10511003
if queued != 1 {
1052-
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
1004+
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
10531005
}
10541006
}
10551007
if err := validateTxPoolInternals(pool); err != nil {

0 commit comments

Comments
 (0)