Skip to content

Commit a541fbe

Browse files
authored
eth/protocols/eth: simplify peer known block/txs caches (ethereum#23649)
* Simplify peer known block/txns cache * Address minor changes * Add more minor comments * Minor changes from review
1 parent 3531ca2 commit a541fbe

File tree

2 files changed

+72
-52
lines changed

2 files changed

+72
-52
lines changed

eth/protocols/eth/peer.go

Lines changed: 45 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,12 @@ type Peer struct {
7575
head common.Hash // Latest advertised head block hash
7676
td *big.Int // Latest advertised head block total difficulty
7777

78-
knownBlocks mapset.Set // Set of block hashes known to be known by this peer
78+
knownBlocks *knownCache // Set of block hashes known to be known by this peer
7979
queuedBlocks chan *blockPropagation // Queue of blocks to broadcast to the peer
8080
queuedBlockAnns chan *types.Block // Queue of blocks to announce to the peer
8181

8282
txpool TxPool // Transaction pool used by the broadcasters for liveness checks
83-
knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
83+
knownTxs *knownCache // Set of transaction hashes known to be known by this peer
8484
txBroadcast chan []common.Hash // Channel used to queue transaction propagation requests
8585
txAnnounce chan []common.Hash // Channel used to queue transaction announcement requests
8686

@@ -96,8 +96,8 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe
9696
Peer: p,
9797
rw: rw,
9898
version: version,
99-
knownTxs: mapset.NewSet(),
100-
knownBlocks: mapset.NewSet(),
99+
knownTxs: newKnownCache(maxKnownTxs),
100+
knownBlocks: newKnownCache(maxKnownBlocks),
101101
queuedBlocks: make(chan *blockPropagation, maxQueuedBlocks),
102102
queuedBlockAnns: make(chan *types.Block, maxQueuedBlockAnns),
103103
txBroadcast: make(chan []common.Hash),
@@ -162,19 +162,13 @@ func (p *Peer) KnownTransaction(hash common.Hash) bool {
162162
// never be propagated to this particular peer.
163163
func (p *Peer) markBlock(hash common.Hash) {
164164
// If we reached the memory allowance, drop a previously known block hash
165-
for p.knownBlocks.Cardinality() >= maxKnownBlocks {
166-
p.knownBlocks.Pop()
167-
}
168165
p.knownBlocks.Add(hash)
169166
}
170167

171168
// markTransaction marks a transaction as known for the peer, ensuring that it
172169
// will never be propagated to this particular peer.
173170
func (p *Peer) markTransaction(hash common.Hash) {
174171
// If we reached the memory allowance, drop a previously known transaction hash
175-
for p.knownTxs.Cardinality() >= maxKnownTxs {
176-
p.knownTxs.Pop()
177-
}
178172
p.knownTxs.Add(hash)
179173
}
180174

@@ -189,9 +183,6 @@ func (p *Peer) markTransaction(hash common.Hash) {
189183
// tests that directly send messages without having to do the asyn queueing.
190184
func (p *Peer) SendTransactions(txs types.Transactions) error {
191185
// Mark all the transactions as known, but ensure we don't overflow our limits
192-
for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(txs)) {
193-
p.knownTxs.Pop()
194-
}
195186
for _, tx := range txs {
196187
p.knownTxs.Add(tx.Hash())
197188
}
@@ -205,12 +196,7 @@ func (p *Peer) AsyncSendTransactions(hashes []common.Hash) {
205196
select {
206197
case p.txBroadcast <- hashes:
207198
// Mark all the transactions as known, but ensure we don't overflow our limits
208-
for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
209-
p.knownTxs.Pop()
210-
}
211-
for _, hash := range hashes {
212-
p.knownTxs.Add(hash)
213-
}
199+
p.knownTxs.Add(hashes...)
214200
case <-p.term:
215201
p.Log().Debug("Dropping transaction propagation", "count", len(hashes))
216202
}
@@ -224,12 +210,7 @@ func (p *Peer) AsyncSendTransactions(hashes []common.Hash) {
224210
// not be managed directly.
225211
func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash) error {
226212
// Mark all the transactions as known, but ensure we don't overflow our limits
227-
for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
228-
p.knownTxs.Pop()
229-
}
230-
for _, hash := range hashes {
231-
p.knownTxs.Add(hash)
232-
}
213+
p.knownTxs.Add(hashes...)
233214
return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket(hashes))
234215
}
235216

@@ -240,12 +221,7 @@ func (p *Peer) AsyncSendPooledTransactionHashes(hashes []common.Hash) {
240221
select {
241222
case p.txAnnounce <- hashes:
242223
// Mark all the transactions as known, but ensure we don't overflow our limits
243-
for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
244-
p.knownTxs.Pop()
245-
}
246-
for _, hash := range hashes {
247-
p.knownTxs.Add(hash)
248-
}
224+
p.knownTxs.Add(hashes...)
249225
case <-p.term:
250226
p.Log().Debug("Dropping transaction announcement", "count", len(hashes))
251227
}
@@ -254,12 +230,8 @@ func (p *Peer) AsyncSendPooledTransactionHashes(hashes []common.Hash) {
254230
// ReplyPooledTransactionsRLP is the eth/66 version of SendPooledTransactionsRLP.
255231
func (p *Peer) ReplyPooledTransactionsRLP(id uint64, hashes []common.Hash, txs []rlp.RawValue) error {
256232
// Mark all the transactions as known, but ensure we don't overflow our limits
257-
for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
258-
p.knownTxs.Pop()
259-
}
260-
for _, hash := range hashes {
261-
p.knownTxs.Add(hash)
262-
}
233+
p.knownTxs.Add(hashes...)
234+
263235
// Not packed into PooledTransactionsPacket to avoid RLP decoding
264236
return p2p.Send(p.rw, PooledTransactionsMsg, PooledTransactionsRLPPacket66{
265237
RequestId: id,
@@ -271,12 +243,8 @@ func (p *Peer) ReplyPooledTransactionsRLP(id uint64, hashes []common.Hash, txs [
271243
// a hash notification.
272244
func (p *Peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {
273245
// Mark all the block hashes as known, but ensure we don't overflow our limits
274-
for p.knownBlocks.Cardinality() > max(0, maxKnownBlocks-len(hashes)) {
275-
p.knownBlocks.Pop()
276-
}
277-
for _, hash := range hashes {
278-
p.knownBlocks.Add(hash)
279-
}
246+
p.knownBlocks.Add(hashes...)
247+
280248
request := make(NewBlockHashesPacket, len(hashes))
281249
for i := 0; i < len(hashes); i++ {
282250
request[i].Hash = hashes[i]
@@ -292,9 +260,6 @@ func (p *Peer) AsyncSendNewBlockHash(block *types.Block) {
292260
select {
293261
case p.queuedBlockAnns <- block:
294262
// Mark all the block hash as known, but ensure we don't overflow our limits
295-
for p.knownBlocks.Cardinality() >= maxKnownBlocks {
296-
p.knownBlocks.Pop()
297-
}
298263
p.knownBlocks.Add(block.Hash())
299264
default:
300265
p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash())
@@ -304,9 +269,6 @@ func (p *Peer) AsyncSendNewBlockHash(block *types.Block) {
304269
// SendNewBlock propagates an entire block to a remote peer.
305270
func (p *Peer) SendNewBlock(block *types.Block, td *big.Int) error {
306271
// Mark all the block hash as known, but ensure we don't overflow our limits
307-
for p.knownBlocks.Cardinality() >= maxKnownBlocks {
308-
p.knownBlocks.Pop()
309-
}
310272
p.knownBlocks.Add(block.Hash())
311273
return p2p.Send(p.rw, NewBlockMsg, &NewBlockPacket{
312274
Block: block,
@@ -320,9 +282,6 @@ func (p *Peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
320282
select {
321283
case p.queuedBlocks <- &blockPropagation{block: block, td: td}:
322284
// Mark all the block hash as known, but ensure we don't overflow our limits
323-
for p.knownBlocks.Cardinality() >= maxKnownBlocks {
324-
p.knownBlocks.Pop()
325-
}
326285
p.knownBlocks.Add(block.Hash())
327286
default:
328287
p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash())
@@ -465,3 +424,37 @@ func (p *Peer) RequestTxs(hashes []common.Hash) error {
465424
GetPooledTransactionsPacket: hashes,
466425
})
467426
}
427+
428+
// knownCache is a cache for known hashes.
429+
type knownCache struct {
430+
hashes mapset.Set
431+
max int
432+
}
433+
434+
// newKnownCache creates a new knownCache with a max capacity.
435+
func newKnownCache(max int) *knownCache {
436+
return &knownCache{
437+
max: max,
438+
hashes: mapset.NewSet(),
439+
}
440+
}
441+
442+
// Add adds a list of elements to the set.
443+
func (k *knownCache) Add(hashes ...common.Hash) {
444+
for k.hashes.Cardinality() > max(0, k.max-len(hashes)) {
445+
k.hashes.Pop()
446+
}
447+
for _, hash := range hashes {
448+
k.hashes.Add(hash)
449+
}
450+
}
451+
452+
// Contains returns whether the given item is in the set.
453+
func (k *knownCache) Contains(hash common.Hash) bool {
454+
return k.hashes.Contains(hash)
455+
}
456+
457+
// Cardinality returns the number of elements in the set.
458+
func (k *knownCache) Cardinality() int {
459+
return k.hashes.Cardinality()
460+
}

eth/protocols/eth/peer_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ package eth
2121

2222
import (
2323
"crypto/rand"
24+
"testing"
2425

26+
"github.com/ethereum/go-ethereum/common"
2527
"github.com/ethereum/go-ethereum/p2p"
2628
"github.com/ethereum/go-ethereum/p2p/enode"
2729
)
@@ -59,3 +61,28 @@ func (p *testPeer) close() {
5961
p.Peer.Close()
6062
p.app.Close()
6163
}
64+
65+
func TestPeerSet(t *testing.T) {
66+
size := 5
67+
s := newKnownCache(size)
68+
69+
// add 10 items
70+
for i := 0; i < size*2; i++ {
71+
s.Add(common.Hash{byte(i)})
72+
}
73+
74+
if s.Cardinality() != size {
75+
t.Fatalf("wrong size, expected %d but found %d", size, s.Cardinality())
76+
}
77+
78+
vals := []common.Hash{}
79+
for i := 10; i < 20; i++ {
80+
vals = append(vals, common.Hash{byte(i)})
81+
}
82+
83+
// add item in batch
84+
s.Add(vals...)
85+
if s.Cardinality() < size {
86+
t.Fatalf("bad size")
87+
}
88+
}

0 commit comments

Comments
 (0)