Skip to content

Commit d964f78

Browse files
authored
upgrade rlp, transactions process optimization (scroll-tech#111)
* upgrade rlp to go-ethereum v.14.6 * core/types: transaction and receipt encoding/decoding optimizations ethereum#27976 * core/types: use new atomic types in caches ethereum#29411, fix eth/handler bug * eth/fetcher: throttle peers which deliver many invalid transactions ethereum#25573 * core: preallocate map in tx_pool ethereum#25737 * core/txpool: protect cache with mutex ethereum#27898 * upgrade memsize to v0.0.2
1 parent 5f73e27 commit d964f78

31 files changed

+488
-190
lines changed

cmd/devp2p/internal/ethtest/helpers.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,9 +457,13 @@ func (s *Suite) waitAnnounce(conn *Conn, blockAnnouncement *NewBlock) error {
457457
return fmt.Errorf("wrong block hash in announcement: expected %v, got %v", blockAnnouncement.Block.Hash(), hashes[0].Hash)
458458
}
459459
return nil
460+
461+
// ignore tx announcements from previous tests
460462
case *NewPooledTransactionHashes:
461-
// ignore tx announcements from previous tests
462463
continue
464+
case *Transactions:
465+
continue
466+
463467
default:
464468
return fmt.Errorf("unexpected: %s", pretty.Sdump(msg))
465469
}

cmd/devp2p/internal/ethtest/suite.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -778,9 +778,13 @@ func (s *Suite) TestNewPooledTxs66(t *utesting.T) {
778778
t.Fatalf("unexpected number of txs requested: wanted %d, got %d", len(hashes), len(msg))
779779
}
780780
return
781+
781782
// ignore propagated txs from previous tests
782783
case *NewPooledTransactionHashes:
783784
continue
785+
case *Transactions:
786+
continue
787+
784788
// ignore block announcements from previous tests
785789
case *NewBlockHashes:
786790
continue

cmd/devp2p/internal/ethtest/transaction.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
"github.com/scroll-tech/go-ethereum/params"
3030
)
3131

32-
//var faucetAddr = common.HexToAddress("0x71562b71999873DB5b286dF957af199Ec94617F7")
32+
// var faucetAddr = common.HexToAddress("0x71562b71999873DB5b286dF957af199Ec94617F7")
3333
var faucetKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
3434

3535
func (s *Suite) sendSuccessfulTxs(t *utesting.T, isEth66 bool) error {
@@ -200,10 +200,12 @@ func sendMultipleSuccessfulTxs(t *utesting.T, s *Suite, txs []*types.Transaction
200200
}
201201
// update nonce
202202
nonce = txs[len(txs)-1].Nonce()
203-
// Wait for the transaction announcement(s) and make sure all sent txs are being propagated
203+
204+
// Wait for the transaction announcement(s) and make sure all sent txs are being propagated.
205+
// all txs should be announced within a couple announcements.
204206
recvHashes := make([]common.Hash, 0)
205-
// all txs should be announced within 3 announcements
206-
for i := 0; i < 3; i++ {
207+
208+
for i := 0; i < 20; i++ {
207209
switch msg := recvConn.readAndServe(s.chain, timeout).(type) {
208210
case *Transactions:
209211
for _, tx := range *msg {

core/tx_list.go

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"container/heap"
2121
"math"
2222
"math/big"
23+
"slices"
2324
"sort"
2425
"sync"
2526
"sync/atomic"
@@ -49,16 +50,18 @@ func (h *nonceHeap) Pop() interface{} {
4950
old := *h
5051
n := len(old)
5152
x := old[n-1]
53+
old[n-1] = 0
5254
*h = old[0 : n-1]
5355
return x
5456
}
5557

5658
// txSortedMap is a nonce->transaction hash map with a heap based index to allow
5759
// iterating over the contents in a nonce-incrementing way.
5860
type txSortedMap struct {
59-
items map[uint64]*types.Transaction // Hash map storing the transaction data
60-
index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode)
61-
cache types.Transactions // Cache of the transactions already sorted
61+
items map[uint64]*types.Transaction // Hash map storing the transaction data
62+
index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode)
63+
cache types.Transactions // Cache of the transactions already sorted
64+
cacheMu sync.Mutex // Mutex covering the cache
6265
}
6366

6467
// newTxSortedMap creates a new nonce-sorted transaction map.
@@ -81,7 +84,9 @@ func (m *txSortedMap) Put(tx *types.Transaction) {
8184
if m.items[nonce] == nil {
8285
heap.Push(m.index, nonce)
8386
}
87+
m.cacheMu.Lock()
8488
m.items[nonce], m.cache = tx, nil
89+
m.cacheMu.Unlock()
8590
}
8691

8792
// Forward removes all transactions from the map with a nonce lower than the
@@ -97,9 +102,11 @@ func (m *txSortedMap) Forward(threshold uint64) types.Transactions {
97102
delete(m.items, nonce)
98103
}
99104
// If we had a cached order, shift the front
105+
m.cacheMu.Lock()
100106
if m.cache != nil {
101107
m.cache = m.cache[len(removed):]
102108
}
109+
m.cacheMu.Unlock()
103110
return removed
104111
}
105112

@@ -123,7 +130,9 @@ func (m *txSortedMap) reheap() {
123130
*m.index = append(*m.index, nonce)
124131
}
125132
heap.Init(m.index)
133+
m.cacheMu.Lock()
126134
m.cache = nil
135+
m.cacheMu.Unlock()
127136
}
128137

129138
// filter is identical to Filter, but **does not** regenerate the heap. This method
@@ -139,7 +148,9 @@ func (m *txSortedMap) filter(filter func(*types.Transaction) bool) types.Transac
139148
}
140149
}
141150
if len(removed) > 0 {
151+
m.cacheMu.Lock()
142152
m.cache = nil
153+
m.cacheMu.Unlock()
143154
}
144155
return removed
145156
}
@@ -153,19 +164,21 @@ func (m *txSortedMap) Cap(threshold int) types.Transactions {
153164
}
154165
// Otherwise gather and drop the highest nonce'd transactions
155166
var drops types.Transactions
156-
157-
sort.Sort(*m.index)
167+
slices.Sort(*m.index)
158168
for size := len(m.items); size > threshold; size-- {
159169
drops = append(drops, m.items[(*m.index)[size-1]])
160170
delete(m.items, (*m.index)[size-1])
161171
}
162172
*m.index = (*m.index)[:threshold]
163-
heap.Init(m.index)
173+
// The sorted m.index slice is still a valid heap, so there is no need to
174+
// reheap after deleting tail items.
164175

165176
// If we had a cache, shift the back
177+
m.cacheMu.Lock()
166178
if m.cache != nil {
167179
m.cache = m.cache[:len(m.cache)-len(drops)]
168180
}
181+
m.cacheMu.Unlock()
169182
return drops
170183
}
171184

@@ -185,7 +198,9 @@ func (m *txSortedMap) Remove(nonce uint64) bool {
185198
}
186199
}
187200
delete(m.items, nonce)
201+
m.cacheMu.Lock()
188202
m.cache = nil
203+
m.cacheMu.Unlock()
189204

190205
return true
191206
}
@@ -195,7 +210,7 @@ func (m *txSortedMap) Remove(nonce uint64) bool {
195210
// removed from the list.
196211
//
197212
// Note, all transactions with nonces lower than start will also be returned to
198-
// prevent getting into and invalid state. This is not something that should ever
213+
// prevent getting into an invalid state. This is not something that should ever
199214
// happen but better to be self correcting than failing!
200215
func (m *txSortedMap) Ready(start uint64) types.Transactions {
201216
// Short circuit if no transactions are available
@@ -209,7 +224,9 @@ func (m *txSortedMap) Ready(start uint64) types.Transactions {
209224
delete(m.items, next)
210225
heap.Pop(m.index)
211226
}
227+
m.cacheMu.Lock()
212228
m.cache = nil
229+
m.cacheMu.Unlock()
213230

214231
return ready
215232
}
@@ -220,6 +237,8 @@ func (m *txSortedMap) Len() int {
220237
}
221238

222239
func (m *txSortedMap) flatten() types.Transactions {
240+
m.cacheMu.Lock()
241+
defer m.cacheMu.Unlock()
223242
// If the sorting was not cached yet, create and cache it
224243
if m.cache == nil {
225244
m.cache = make(types.Transactions, 0, len(m.items))
@@ -604,6 +623,7 @@ func (l *txPricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool
604623

605624
// Discard finds a number of most underpriced transactions, removes them from the
606625
// priced list and returns them for further removal from the entire pool.
626+
// If noPending is set to true, we will only consider the floating list
607627
//
608628
// Note local transaction won't be considered for eviction.
609629
func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool) {

core/tx_pool.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -510,11 +510,11 @@ func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common
510510
pool.mu.Lock()
511511
defer pool.mu.Unlock()
512512

513-
pending := make(map[common.Address]types.Transactions)
513+
pending := make(map[common.Address]types.Transactions, len(pool.pending))
514514
for addr, list := range pool.pending {
515515
pending[addr] = list.Flatten()
516516
}
517-
queued := make(map[common.Address]types.Transactions)
517+
queued := make(map[common.Address]types.Transactions, len(pool.queue))
518518
for addr, list := range pool.queue {
519519
queued[addr] = list.Flatten()
520520
}
@@ -1677,7 +1677,7 @@ type accountSet struct {
16771677
// derivations.
16781678
func newAccountSet(signer types.Signer, addrs ...common.Address) *accountSet {
16791679
as := &accountSet{
1680-
accounts: make(map[common.Address]struct{}),
1680+
accounts: make(map[common.Address]struct{}, len(addrs)),
16811681
signer: signer,
16821682
}
16831683
for _, addr := range addrs {

core/types/hashing.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package types
1818

1919
import (
2020
"bytes"
21+
"fmt"
22+
"math"
2123
"sync"
2224

2325
"golang.org/x/crypto/sha3"
@@ -37,6 +39,22 @@ var encodeBufferPool = sync.Pool{
3739
New: func() interface{} { return new(bytes.Buffer) },
3840
}
3941

42+
// getPooledBuffer retrieves a buffer from the pool and creates a byte slice of the
43+
// requested size from it.
44+
//
45+
// The caller should return the *bytes.Buffer object back into encodeBufferPool after use!
46+
// The returned byte slice must not be used after returning the buffer.
47+
func getPooledBuffer(size uint64) ([]byte, *bytes.Buffer, error) {
48+
if size > math.MaxInt {
49+
return nil, nil, fmt.Errorf("can't get buffer of size %d", size)
50+
}
51+
buf := encodeBufferPool.Get().(*bytes.Buffer)
52+
buf.Reset()
53+
buf.Grow(int(size))
54+
b := buf.Bytes()[:int(size)]
55+
return b, buf, nil
56+
}
57+
4058
// rlpHash encodes x and hashes the encoded bytes.
4159
func rlpHash(x interface{}) (h common.Hash) {
4260
sha := hasherPool.Get().(crypto.KeccakState)

core/types/receipt.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@ var (
3838
receiptStatusSuccessfulRLP = []byte{0x01}
3939
)
4040

41-
// This error is returned when a typed receipt is decoded, but the string is empty.
42-
var errEmptyTypedReceipt = errors.New("empty typed receipt bytes")
41+
var errShortTypedReceipt = errors.New("typed receipt too short")
4342

4443
const (
4544
// ReceiptStatusFailed is the status code of a transaction if execution failed.
@@ -191,7 +190,7 @@ func (r *Receipt) MarshalBinary() ([]byte, error) {
191190
// DecodeRLP implements rlp.Decoder, and loads the consensus fields of a receipt
192191
// from an RLP stream.
193192
func (r *Receipt) DecodeRLP(s *rlp.Stream) error {
194-
kind, _, err := s.Kind()
193+
kind, size, err := s.Kind()
195194
switch {
196195
case err != nil:
197196
return err
@@ -203,15 +202,19 @@ func (r *Receipt) DecodeRLP(s *rlp.Stream) error {
203202
}
204203
r.Type = LegacyTxType
205204
return r.setFromRLP(dec)
206-
case kind == rlp.String:
205+
case kind == rlp.Byte:
206+
return errShortTypedReceipt
207+
default:
207208
// It's an EIP-2718 typed tx receipt.
208-
b, err := s.Bytes()
209+
b, buf, err := getPooledBuffer(size)
209210
if err != nil {
210211
return err
211212
}
213+
defer encodeBufferPool.Put(buf)
214+
if err := s.ReadBytes(b); err != nil {
215+
return err
216+
}
212217
return r.decodeTyped(b)
213-
default:
214-
return rlp.ErrExpectedList
215218
}
216219
}
217220

@@ -234,8 +237,8 @@ func (r *Receipt) UnmarshalBinary(b []byte) error {
234237

235238
// decodeTyped decodes a typed receipt from the canonical format.
236239
func (r *Receipt) decodeTyped(b []byte) error {
237-
if len(b) == 0 {
238-
return errEmptyTypedReceipt
240+
if len(b) <= 1 {
241+
return errShortTypedReceipt
239242
}
240243
switch b[0] {
241244
case DynamicFeeTxType, AccessListTxType, BlobTxType, L1MessageTxType:

core/types/receipt_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func TestDecodeEmptyTypedReceipt(t *testing.T) {
8686
input := []byte{0x80}
8787
var r Receipt
8888
err := rlp.DecodeBytes(input, &r)
89-
if err != errEmptyTypedReceipt {
89+
if err != errShortTypedReceipt {
9090
t.Fatal("wrong error:", err)
9191
}
9292
}

0 commit comments

Comments
 (0)