Skip to content

Commit f9650b0

Browse files
holimankaralabe
authored andcommitted
eth/downloader: refactor downloader + queue
downloader, fetcher: throttle-metrics, fetcher filter improvements, standalone resultcache downloader: more accurate deliverytime calculation, less mem overhead in state requests downloader/queue: increase underlying buffer of results, new throttle mechanism eth/downloader: updates to tests eth/downloader: fix up some review concerns eth/downloader/queue: minor fixes eth/downloader: minor fixes after review call eth/downloader: testcases for queue.go eth/downloader: minor change, don't set progress unless progress... eth/downloader: fix flaw which prevented useless peers from being dropped eth/downloader: try to fix tests eth/downloader: verify non-deliveries against advertised remote head eth/downloader: fix flaw with checking closed-status causing hang eth/downloader: hashing avoidance eth/downloader: review concerns + simplify resultcache and queue eth/downloader: add back some locks, address review concerns downloader/queue: fix remaining lock flaw
1 parent 4c268e6 commit f9650b0

File tree

11 files changed

+1116
-360
lines changed

11 files changed

+1116
-360
lines changed

core/types/block.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,17 @@ func rlpHash(x interface{}) (h common.Hash) {
147147
return h
148148
}
149149

150+
// EmptyBody returns true if there is no additional 'body' to complete the header
151+
// that is: no transactions and no uncles.
152+
func (h *Header) EmptyBody() bool {
153+
return h.TxHash == EmptyRootHash && h.UncleHash == EmptyUncleHash
154+
}
155+
156+
// EmptyReceipts returns true if there are no receipts for this header/block.
157+
func (h *Header) EmptyReceipts() bool {
158+
return h.ReceiptHash == EmptyRootHash
159+
}
160+
150161
// Body is a simple (mutable, non-safe) data container for storing and moving
151162
// a block's data contents (transactions and uncles) together.
152163
type Body struct {

eth/downloader/downloader.go

Lines changed: 55 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom,
219219
stateBloom: stateBloom,
220220
mux: mux,
221221
checkpoint: checkpoint,
222-
queue: newQueue(),
222+
queue: newQueue(blockCacheItems),
223223
peers: newPeerSet(),
224224
rttEstimate: uint64(rttMaxEstimate),
225225
rttConfidence: uint64(1000000),
@@ -370,7 +370,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
370370
d.stateBloom.Close()
371371
}
372372
// Reset the queue, peer set and wake channels to clean any internal leftover state
373-
d.queue.Reset()
373+
d.queue.Reset(blockCacheItems)
374374
d.peers.Reset()
375375

376376
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
@@ -597,8 +597,10 @@ func (d *Downloader) Terminate() {
597597
default:
598598
close(d.quitCh)
599599
}
600+
if d.stateBloom != nil {
601+
d.stateBloom.Close()
602+
}
600603
d.quitLock.Unlock()
601-
602604
// Cancel any pending download requests
603605
d.Cancel()
604606
}
@@ -629,7 +631,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
629631
// Make sure the peer actually gave something valid
630632
headers := packet.(*headerPack).headers
631633
if len(headers) != 1 {
632-
p.log.Debug("Multiple headers for single request", "headers", len(headers))
634+
p.log.Warn("Multiple headers for single request", "headers", len(headers))
633635
return nil, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
634636
}
635637
head := headers[0]
@@ -866,7 +868,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
866868
// Make sure the peer actually gave something valid
867869
headers := packer.(*headerPack).headers
868870
if len(headers) != 1 {
869-
p.log.Debug("Multiple headers for single request", "headers", len(headers))
871+
p.log.Warn("Multiple headers for single request", "headers", len(headers))
870872
return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
871873
}
872874
arrived = true
@@ -890,7 +892,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
890892
}
891893
header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
892894
if header.Number.Uint64() != check {
893-
p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
895+
p.log.Warn("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
894896
return 0, fmt.Errorf("%w: non-requested header (%d)", errBadPeer, header.Number)
895897
}
896898
start = check
@@ -1106,17 +1108,18 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
11061108
pack := packet.(*headerPack)
11071109
return d.queue.DeliverHeaders(pack.peerID, pack.headers, d.headerProcCh)
11081110
}
1109-
expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
1110-
throttle = func() bool { return false }
1111-
reserve = func(p *peerConnection, count int) (*fetchRequest, bool, error) {
1112-
return d.queue.ReserveHeaders(p, count), false, nil
1111+
expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
1112+
reserve = func(p *peerConnection, count int) (*fetchRequest, bool, bool) {
1113+
return d.queue.ReserveHeaders(p, count), false, false
11131114
}
11141115
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
11151116
capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) }
1116-
setIdle = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) }
1117+
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) {
1118+
p.SetHeadersIdle(accepted, deliveryTime)
1119+
}
11171120
)
11181121
err := d.fetchParts(d.headerCh, deliver, d.queue.headerContCh, expire,
1119-
d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
1122+
d.queue.PendingHeaders, d.queue.InFlightHeaders, reserve,
11201123
nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers")
11211124

11221125
log.Debug("Skeleton fill terminated", "err", err)
@@ -1139,10 +1142,10 @@ func (d *Downloader) fetchBodies(from uint64) error {
11391142
expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
11401143
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
11411144
capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
1142-
setIdle = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) }
1145+
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) { p.SetBodiesIdle(accepted, deliveryTime) }
11431146
)
11441147
err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire,
1145-
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
1148+
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ReserveBodies,
11461149
d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies")
11471150

11481151
log.Debug("Block body download terminated", "err", err)
@@ -1163,10 +1166,12 @@ func (d *Downloader) fetchReceipts(from uint64) error {
11631166
expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
11641167
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
11651168
capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
1166-
setIdle = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) }
1169+
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) {
1170+
p.SetReceiptsIdle(accepted, deliveryTime)
1171+
}
11671172
)
11681173
err := d.fetchParts(d.receiptCh, deliver, d.receiptWakeCh, expire,
1169-
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
1174+
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ReserveReceipts,
11701175
d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")
11711176

11721177
log.Debug("Transaction receipt download terminated", "err", err)
@@ -1199,14 +1204,13 @@ func (d *Downloader) fetchReceipts(from uint64) error {
11991204
// - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
12001205
// - kind: textual label of the type being downloaded to display in log messages
12011206
func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
1202-
expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error),
1207+
expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool),
12031208
fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
1204-
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error {
1209+
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error {
12051210

12061211
// Create a ticker to detect expired retrieval tasks
12071212
ticker := time.NewTicker(100 * time.Millisecond)
12081213
defer ticker.Stop()
1209-
12101214
update := make(chan struct{}, 1)
12111215

12121216
// Prepare the queue and fetch block parts until the block header fetcher's done
@@ -1217,6 +1221,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
12171221
return errCanceled
12181222

12191223
case packet := <-deliveryCh:
1224+
deliveryTime := time.Now()
12201225
// If the peer was previously banned and failed to deliver its pack
12211226
// in a reasonable time frame, ignore its message.
12221227
if peer := d.peers.Peer(packet.PeerId()); peer != nil {
@@ -1229,7 +1234,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
12291234
// caused by a timed out request which came through in the end), set it to
12301235
// idle. If the delivery's stale, the peer should have already been idled.
12311236
if !errors.Is(err, errStaleDelivery) {
1232-
setIdle(peer, accepted)
1237+
setIdle(peer, accepted, deliveryTime)
12331238
}
12341239
// Issue a log to the user to see what's going on
12351240
switch {
@@ -1282,7 +1287,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
12821287
// how response times reacts, to it always requests one more than the minimum (i.e. min 2).
12831288
if fails > 2 {
12841289
peer.log.Trace("Data delivery timed out", "type", kind)
1285-
setIdle(peer, 0)
1290+
setIdle(peer, 0, time.Now())
12861291
} else {
12871292
peer.log.Debug("Stalling delivery, dropping", "type", kind)
12881293

@@ -1317,27 +1322,27 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
13171322
// Send a download request to all idle peers, until throttled
13181323
progressed, throttled, running := false, false, inFlight()
13191324
idles, total := idle()
1320-
1325+
pendCount := pending()
13211326
for _, peer := range idles {
13221327
// Short circuit if throttling activated
1323-
if throttle() {
1324-
throttled = true
1328+
if throttled {
13251329
break
13261330
}
13271331
// Short circuit if there is no more available task.
1328-
if pending() == 0 {
1332+
if pendCount = pending(); pendCount == 0 {
13291333
break
13301334
}
13311335
// Reserve a chunk of fetches for a peer. A nil can mean either that
13321336
// no more headers are available, or that the peer is known not to
13331337
// have them.
1334-
request, progress, err := reserve(peer, capacity(peer))
1335-
if err != nil {
1336-
return err
1337-
}
1338+
request, progress, throttle := reserve(peer, capacity(peer))
13381339
if progress {
13391340
progressed = true
13401341
}
1342+
if throttle {
1343+
throttled = true
1344+
throttleCounter.Inc(1)
1345+
}
13411346
if request == nil {
13421347
continue
13431348
}
@@ -1362,7 +1367,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
13621367
}
13631368
// Make sure that we have peers available for fetching. If all peers have been tried
13641369
// and all failed throw an error
1365-
if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
1370+
if !progressed && !throttled && !running && len(idles) == total && pendCount > 0 {
13661371
return errPeersUnavailable
13671372
}
13681373
}
@@ -1374,8 +1379,11 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
13741379
// queue until the stream ends or a failure occurs.
13751380
func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
13761381
// Keep a count of uncertain headers to roll back
1377-
var rollback []*types.Header
1378-
mode := d.getMode()
1382+
var (
1383+
rollback []*types.Header
1384+
rollbackErr error
1385+
mode = d.getMode()
1386+
)
13791387
defer func() {
13801388
if len(rollback) > 0 {
13811389
// Flatten the headers and roll them back
@@ -1397,7 +1405,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
13971405
log.Warn("Rolled back headers", "count", len(hashes),
13981406
"header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
13991407
"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
1400-
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock))
1408+
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock), "reason", rollbackErr)
14011409
}
14021410
}()
14031411

@@ -1407,6 +1415,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
14071415
for {
14081416
select {
14091417
case <-d.cancelCh:
1418+
rollbackErr = errCanceled
14101419
return errCanceled
14111420

14121421
case headers := <-d.headerProcCh:
@@ -1460,6 +1469,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
14601469
// Terminate if something failed in between processing chunks
14611470
select {
14621471
case <-d.cancelCh:
1472+
rollbackErr = errCanceled
14631473
return errCanceled
14641474
default:
14651475
}
@@ -1484,11 +1494,12 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
14841494
frequency = 1
14851495
}
14861496
if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
1497+
rollbackErr = err
14871498
// If some headers were inserted, add them too to the rollback list
14881499
if n > 0 {
14891500
rollback = append(rollback, chunk[:n]...)
14901501
}
1491-
log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
1502+
log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "parent", chunk[n].ParentHash, "err", err)
14921503
return fmt.Errorf("%w: %v", errInvalidChain, err)
14931504
}
14941505
// All verifications passed, store newly found uncertain headers
@@ -1503,14 +1514,15 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
15031514
for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
15041515
select {
15051516
case <-d.cancelCh:
1517+
rollbackErr = errCanceled
15061518
return errCanceled
15071519
case <-time.After(time.Second):
15081520
}
15091521
}
15101522
// Otherwise insert the headers for content retrieval
15111523
inserts := d.queue.Schedule(chunk, origin)
15121524
if len(inserts) != len(chunk) {
1513-
log.Debug("Stale headers")
1525+
rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunk))
15141526
return fmt.Errorf("%w: stale headers", errBadPeer)
15151527
}
15161528
}
@@ -1680,6 +1692,14 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
16801692
}
16811693

16821694
func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) {
1695+
if len(results) == 0 {
1696+
return nil, nil, nil
1697+
}
1698+
if lastNum := results[len(results)-1].Header.Number.Uint64(); lastNum < pivot {
1699+
// the pivot is somewhere in the future
1700+
return nil, results, nil
1701+
}
1702+
// This can also be optimized, but only happens very seldom
16831703
for _, result := range results {
16841704
num := result.Header.Number.Uint64()
16851705
switch {

0 commit comments

Comments
 (0)