Skip to content

Commit

Permalink
- fetch transactions from a peer in the order they were announced to …
Browse files Browse the repository at this point in the history
…minimize nonce-gaps (which cause blob txs to be rejected)

- don't wait on fetching blob transactions after announcement is received, since they are not broadcast
  • Loading branch information
roberto-bayardo committed Jul 22, 2024
1 parent 380688c commit 3ff3634
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 215 deletions.
11 changes: 10 additions & 1 deletion cmd/devp2p/internal/ethtest/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,16 @@ func (s *Suite) TestBlobViolations(t *utesting.T) {
if code, _, err := conn.Read(); err != nil {
t.Fatalf("expected disconnect on blob violation, got err: %v", err)
} else if code != discMsg {
t.Fatalf("expected disconnect on blob violation, got msg code: %d", code)
if code == 24 {
// sometimes we'll get a blob transaction hashes announcement before the disconnect
// because blob transactions are scheduled to be fetched right away.
if code, _, err = conn.Read(); err != nil {
t.Fatalf("expected disconnect on blob violation, got err on second read: %v", err)
}
}
if code != discMsg {
t.Fatalf("expected disconnect on blob violation, got msg code: %d", code)
}
}
conn.Close()
}
Expand Down
134 changes: 63 additions & 71 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package fetcher

import (
"bytes"
"errors"
"fmt"
"math"
Expand All @@ -35,7 +34,7 @@ import (
)

const (
// maxTxAnnounces is the maximum number of unique transaction a peer
// maxTxAnnounces is the maximum number of unique transactions a peer
// can announce in a short time.
maxTxAnnounces = 4096

Expand Down Expand Up @@ -114,14 +113,17 @@ var errTerminated = errors.New("terminated")
type txAnnounce struct {
origin string // Identifier of the peer originating the notification
hashes []common.Hash // Batch of transaction hashes being announced
metas []*txMetadata // Batch of metadatas associated with the hashes (nil before eth/68)
metas []txMetadata // Batch of metadatas associated with the hashes
}

// txMetadata is a set of extra data transmitted along the announcement for better
// fetch scheduling.
// txMetadata provides the extra data transmitted along with the announcement
// for better fetch scheduling ('kind' & 'size'), plus an extra field
// ('arrival') to keep track of its order of arrival. 'size==0' can be used to
// test for 0 pre-eth/68 announcements. In this case, kind will also be 0.
type txMetadata struct {
kind byte // Transaction consensus type
size uint32 // Transaction size in bytes
kind byte // Transaction consensus type
size uint32 // Transaction size in bytes, or 0 if the announcement didn't include metadata
arrival uint64 // Value that can be used to sort announcements by order of arrival
}

// txRequest represents an in-flight transaction retrieval request destined to
Expand Down Expand Up @@ -159,7 +161,7 @@ type txDrop struct {
// The invariants of the fetcher are:
// - Each tracked transaction (hash) must only be present in one of the
// three stages. This ensures that the fetcher operates akin to a finite
// state automata and there's do data leak.
// state automata and there's no data leak.
// - Each peer that announced transactions may be scheduled retrievals, but
// only ever one concurrently. This ensures we can immediately know what is
// missing from a reply and reschedule it.
Expand All @@ -171,16 +173,18 @@ type TxFetcher struct {

underpriced *lru.Cache[common.Hash, time.Time] // Transactions discarded as too cheap (don't re-fetch)

counter uint64 // counter used to assign arrival order to tx announcements

// Stage 1: Waiting lists for newly discovered transactions that might be
// broadcast without needing explicit request/reply round trips.
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]*txMetadata // Waiting announcements grouped by peer (DoS protection)
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]txMetadata // Waiting announcements grouped by peer (DoS protection)

// Stage 2: Queue of transactions that waiting to be allocated to some peer
// to be retrieved directly.
announces map[string]map[common.Hash]*txMetadata // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash
announces map[string]map[common.Hash]txMetadata // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash

// Stage 3: Set of transactions currently being retrieved, some which may be
// fulfilled and some rescheduled. Note, this step shares 'announces' from the
Expand Down Expand Up @@ -218,8 +222,8 @@ func NewTxFetcherForTests(
quit: make(chan struct{}),
waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]*txMetadata),
announces: make(map[string]map[common.Hash]*txMetadata),
waitslots: make(map[string]map[common.Hash]txMetadata),
announces: make(map[string]map[common.Hash]txMetadata),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
Expand Down Expand Up @@ -247,7 +251,7 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c
// loop, so anything caught here is time saved internally.
var (
unknownHashes = make([]common.Hash, 0, len(hashes))
unknownMetas = make([]*txMetadata, 0, len(hashes))
unknownMetas = make([]txMetadata, 0, len(hashes))

duplicate int64
underpriced int64
Expand All @@ -261,10 +265,15 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c
default:
unknownHashes = append(unknownHashes, hash)
if types == nil {
unknownMetas = append(unknownMetas, nil)
unknownMetas = append(unknownMetas, txMetadata{arrival: f.counter})
} else {
unknownMetas = append(unknownMetas, &txMetadata{kind: types[i], size: sizes[i]})
if sizes[i] == 0 {
// invalid size parameter, return error
return fmt.Errorf("announcement from tx %x had an invalid 0 size metadata", hash)
}
unknownMetas = append(unknownMetas, txMetadata{kind: types[i], size: sizes[i], arrival: f.counter})
}
f.counter++
}
}
txAnnounceKnownMeter.Mark(duplicate)
Expand All @@ -275,6 +284,7 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c
return nil
}
announce := &txAnnounce{origin: peer, hashes: unknownHashes, metas: unknownMetas}
f.counter++
select {
case f.notify <- announce:
return nil
Expand Down Expand Up @@ -445,7 +455,7 @@ func (f *TxFetcher) loop() {
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = ann.metas[i]
} else {
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
f.announces[ann.origin] = map[common.Hash]txMetadata{hash: ann.metas[i]}
}
continue
}
Expand All @@ -458,7 +468,7 @@ func (f *TxFetcher) loop() {
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = ann.metas[i]
} else {
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
f.announces[ann.origin] = map[common.Hash]txMetadata{hash: ann.metas[i]}
}
continue
}
Expand All @@ -477,18 +487,26 @@ func (f *TxFetcher) loop() {
if waitslots := f.waitslots[ann.origin]; waitslots != nil {
waitslots[hash] = ann.metas[i]
} else {
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
f.waitslots[ann.origin] = map[common.Hash]txMetadata{hash: ann.metas[i]}
}
continue
}
// Transaction unknown to the fetcher, insert it into the waiting list
f.waitlist[hash] = map[string]struct{}{ann.origin: {}}
f.waittime[hash] = f.clock.Now()
if ann.metas[i].kind == types.BlobTxType {
// blob transactions are never broadcast, so to force them
// to be fetched immediately we pretend they arrived
// earlier.
f.waittime[hash] = f.clock.Now() - mclock.AbsTime(txArriveTimeout)
idleWait = true // may need to reschedule fetcher due to "time travel"
} else {
f.waittime[hash] = f.clock.Now()
}

if waitslots := f.waitslots[ann.origin]; waitslots != nil {
waitslots[hash] = ann.metas[i]
} else {
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
f.waitslots[ann.origin] = map[common.Hash]txMetadata{hash: ann.metas[i]}
}
}
// If a new item was added to the waitlist, schedule it into the fetcher
Expand Down Expand Up @@ -516,7 +534,7 @@ func (f *TxFetcher) loop() {
if announces := f.announces[peer]; announces != nil {
announces[hash] = f.waitslots[peer][hash]
} else {
f.announces[peer] = map[common.Hash]*txMetadata{hash: f.waitslots[peer][hash]}
f.announces[peer] = map[common.Hash]txMetadata{hash: f.waitslots[peer][hash]}
}
delete(f.waitslots[peer], hash)
if len(f.waitslots[peer]) == 0 {
Expand Down Expand Up @@ -590,7 +608,7 @@ func (f *TxFetcher) loop() {
for i, hash := range delivery.hashes {
if _, ok := f.waitlist[hash]; ok {
for peer, txset := range f.waitslots {
if meta := txset[hash]; meta != nil {
if meta, ok := txset[hash]; ok && meta.size != 0 {
if delivery.metas[i].kind != meta.kind {
log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", delivery.metas[i].kind, "ann", meta.kind)
f.dropPeer(peer)
Expand All @@ -616,7 +634,7 @@ func (f *TxFetcher) loop() {
delete(f.waittime, hash)
} else {
for peer, txset := range f.announces {
if meta := txset[hash]; meta != nil {
if meta, ok := txset[hash]; ok && meta.size != 0 {
if delivery.metas[i].kind != meta.kind {
log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", delivery.metas[i].kind, "ann", meta.kind)
f.dropPeer(peer)
Expand Down Expand Up @@ -873,7 +891,7 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{},
hashes = make([]common.Hash, 0, maxTxRetrievals)
bytes uint64
)
f.forEachAnnounce(f.announces[peer], func(hash common.Hash, meta *txMetadata) bool {
f.forEachAnnounce(f.announces[peer], func(hash common.Hash, meta txMetadata) bool {
// If the transaction is already fetching, skip to the next one
if _, ok := f.fetching[hash]; ok {
return true
Expand All @@ -892,7 +910,7 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{},
if len(hashes) >= maxTxRetrievals {
return false // break in the for-each
}
if meta != nil { // Only set eth/68 and upwards
if meta.size != 0 { // Only set eth/68 and upwards
bytes += uint64(meta.size)
if bytes >= maxTxRetrievalSize {
return false
Expand Down Expand Up @@ -943,28 +961,25 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string))
}
}

// forEachAnnounce does a range loop over a map of announcements in production,
// but during testing it does a deterministic sorted random to allow reproducing
// issues.
func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadata, do func(hash common.Hash, meta *txMetadata) bool) {
// If we're running production, use whatever Go's map gives us
if f.rand == nil {
for hash, meta := range announces {
if !do(hash, meta) {
return
}
}
return
// forEachAnnounce loops over the given announcements in arrival order, invoking
// the do function for each until it returns false. We enforce an arrival
// ordering to minimize the chances of mempool nonce-gaps, which result in blob
// transactions being rejected by the mempool.
func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]txMetadata, do func(hash common.Hash, meta txMetadata) bool) {
type announcement struct {
hash common.Hash
meta txMetadata
}
// We're running the test suite, make iteration deterministic
list := make([]common.Hash, 0, len(announces))
for hash := range announces {
list = append(list, hash)
// process announcements by their arrival order
list := make([]announcement, 0, len(announces))
for hash, metadata := range announces {
list = append(list, announcement{hash: hash, meta: metadata})
}
sortHashes(list)
rotateHashes(list, f.rand.Intn(len(list)))
for _, hash := range list {
if !do(hash, announces[hash]) {
sort.Slice(list, func(i, j int) bool {
return list[i].meta.arrival < list[j].meta.arrival
})
for i := range list {
if !do(list[i].hash, list[i].meta) {
return
}
}
Expand All @@ -980,26 +995,3 @@ func rotateStrings(slice []string, n int) {
slice[i] = orig[(i+n)%len(orig)]
}
}

// sortHashes sorts a slice of hashes. This method is only used in tests in order
// to simulate random map iteration but keep it deterministic.
func sortHashes(slice []common.Hash) {
for i := 0; i < len(slice); i++ {
for j := i + 1; j < len(slice); j++ {
if bytes.Compare(slice[i][:], slice[j][:]) > 0 {
slice[i], slice[j] = slice[j], slice[i]
}
}
}
}

// rotateHashes rotates the contents of a slice by n steps. This method is only
// used in tests to simulate random map iteration but keep it deterministic.
func rotateHashes(slice []common.Hash, n int) {
orig := make([]common.Hash, len(slice))
copy(orig, slice)

for i := 0; i < len(orig); i++ {
slice[i] = orig[(i+n)%len(orig)]
}
}
Loading

0 comments on commit 3ff3634

Please sign in to comment.