Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions core/types/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ func rlpHash(x interface{}) (h common.Hash) {
return h
}

// EmptyBody returns true if there is no additional 'body' to complete the header
// that is: no transactions and no uncles.
func (h *Header) EmptyBody() bool {
return h.TxHash == EmptyRootHash && h.UncleHash == EmptyUncleHash
}

// EmptyReceipts returns true if there are no receipts for this header/block.
func (h *Header) EmptyReceipts() bool {
return h.ReceiptHash == EmptyRootHash
}

// Body is a simple (mutable, non-safe) data container for storing and moving
// a block's data contents (transactions and uncles) together.
type Body struct {
Expand Down
88 changes: 55 additions & 33 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom,
stateBloom: stateBloom,
mux: mux,
checkpoint: checkpoint,
queue: newQueue(),
queue: newQueue(blockCacheItems),
peers: newPeerSet(),
rttEstimate: uint64(rttMaxEstimate),
rttConfidence: uint64(1000000),
Expand Down Expand Up @@ -370,7 +370,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
d.stateBloom.Close()
}
// Reset the queue, peer set and wake channels to clean any internal leftover state
d.queue.Reset()
d.queue.Reset(blockCacheItems)
d.peers.Reset()

for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
Expand Down Expand Up @@ -597,6 +597,9 @@ func (d *Downloader) Terminate() {
default:
close(d.quitCh)
}
if d.stateBloom != nil {
d.stateBloom.Close()
}
d.quitLock.Unlock()

// Cancel any pending download requests
Expand Down Expand Up @@ -629,7 +632,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
// Make sure the peer actually gave something valid
headers := packet.(*headerPack).headers
if len(headers) != 1 {
p.log.Debug("Multiple headers for single request", "headers", len(headers))
p.log.Warn("Multiple headers for single request", "headers", len(headers))
return nil, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
}
head := headers[0]
Expand Down Expand Up @@ -866,7 +869,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
// Make sure the peer actually gave something valid
headers := packer.(*headerPack).headers
if len(headers) != 1 {
p.log.Debug("Multiple headers for single request", "headers", len(headers))
p.log.Warn("Multiple headers for single request", "headers", len(headers))
return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
}
arrived = true
Expand All @@ -890,7 +893,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
}
header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
if header.Number.Uint64() != check {
p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
p.log.Warn("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
return 0, fmt.Errorf("%w: non-requested header (%d)", errBadPeer, header.Number)
}
start = check
Expand Down Expand Up @@ -1106,17 +1109,18 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
pack := packet.(*headerPack)
return d.queue.DeliverHeaders(pack.peerID, pack.headers, d.headerProcCh)
}
expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
throttle = func() bool { return false }
reserve = func(p *peerConnection, count int) (*fetchRequest, bool, error) {
return d.queue.ReserveHeaders(p, count), false, nil
expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
reserve = func(p *peerConnection, count int) (*fetchRequest, bool, bool) {
return d.queue.ReserveHeaders(p, count), false, false
}
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) }
setIdle = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) }
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) {
p.SetHeadersIdle(accepted, deliveryTime)
}
)
err := d.fetchParts(d.headerCh, deliver, d.queue.headerContCh, expire,
d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
d.queue.PendingHeaders, d.queue.InFlightHeaders, reserve,
nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers")

log.Debug("Skeleton fill terminated", "err", err)
Expand All @@ -1139,10 +1143,10 @@ func (d *Downloader) fetchBodies(from uint64) error {
expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
setIdle = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) }
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) { p.SetBodiesIdle(accepted, deliveryTime) }
)
err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire,
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ReserveBodies,
d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies")

log.Debug("Block body download terminated", "err", err)
Expand All @@ -1163,10 +1167,12 @@ func (d *Downloader) fetchReceipts(from uint64) error {
expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
setIdle = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) }
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) {
p.SetReceiptsIdle(accepted, deliveryTime)
}
)
err := d.fetchParts(d.receiptCh, deliver, d.receiptWakeCh, expire,
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ReserveReceipts,
d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")

log.Debug("Transaction receipt download terminated", "err", err)
Expand Down Expand Up @@ -1199,9 +1205,9 @@ func (d *Downloader) fetchReceipts(from uint64) error {
// - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
// - kind: textual label of the type being downloaded to display in log messages
func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error),
expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool),
fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error {
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error {

// Create a ticker to detect expired retrieval tasks
ticker := time.NewTicker(100 * time.Millisecond)
Expand All @@ -1217,6 +1223,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
return errCanceled

case packet := <-deliveryCh:
deliveryTime := time.Now()
// If the peer was previously banned and failed to deliver its pack
// in a reasonable time frame, ignore its message.
if peer := d.peers.Peer(packet.PeerId()); peer != nil {
Expand All @@ -1229,7 +1236,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
// caused by a timed out request which came through in the end), set it to
// idle. If the delivery's stale, the peer should have already been idled.
if !errors.Is(err, errStaleDelivery) {
setIdle(peer, accepted)
setIdle(peer, accepted, deliveryTime)
}
// Issue a log to the user to see what's going on
switch {
Expand Down Expand Up @@ -1282,7 +1289,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
// how response times reacts, to it always requests one more than the minimum (i.e. min 2).
if fails > 2 {
peer.log.Trace("Data delivery timed out", "type", kind)
setIdle(peer, 0)
setIdle(peer, 0, time.Now())
} else {
peer.log.Debug("Stalling delivery, dropping", "type", kind)

Expand Down Expand Up @@ -1317,27 +1324,27 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
// Send a download request to all idle peers, until throttled
progressed, throttled, running := false, false, inFlight()
idles, total := idle()

pendCount := pending()
for _, peer := range idles {
// Short circuit if throttling activated
if throttle() {
throttled = true
if throttled {
break
}
// Short circuit if there is no more available task.
if pending() == 0 {
if pendCount = pending(); pendCount == 0 {
break
}
// Reserve a chunk of fetches for a peer. A nil can mean either that
// no more headers are available, or that the peer is known not to
// have them.
request, progress, err := reserve(peer, capacity(peer))
if err != nil {
return err
}
request, progress, throttle := reserve(peer, capacity(peer))
if progress {
progressed = true
}
if throttle {
throttled = true
throttleCounter.Inc(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my PR from today I broke out of the for loop (i.e. if it's throttled, there's nothing for other peers either), seems cleaner than continuing. In this PR won't request be nil in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be that there were a couple of items before throttling hit. So it will continue, and on next iteration it will exit.

}
if request == nil {
continue
}
Expand All @@ -1362,7 +1369,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
}
// Make sure that we have peers available for fetching. If all peers have been tried
// and all failed throw an error
if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
if !progressed && !throttled && !running && len(idles) == total && pendCount > 0 {
return errPeersUnavailable
}
}
Expand All @@ -1374,8 +1381,11 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
// queue until the stream ends or a failure occurs.
func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
// Keep a count of uncertain headers to roll back
var rollback []*types.Header
mode := d.getMode()
var (
rollback []*types.Header
rollbackErr error
mode = d.getMode()
)
defer func() {
if len(rollback) > 0 {
// Flatten the headers and roll them back
Expand All @@ -1397,7 +1407,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
log.Warn("Rolled back headers", "count", len(hashes),
"header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock))
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock), "reason", rollbackErr)
}
}()

Expand All @@ -1407,6 +1417,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
for {
select {
case <-d.cancelCh:
rollbackErr = errCanceled
return errCanceled

case headers := <-d.headerProcCh:
Expand Down Expand Up @@ -1460,6 +1471,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
// Terminate if something failed in between processing chunks
select {
case <-d.cancelCh:
rollbackErr = errCanceled
return errCanceled
default:
}
Expand All @@ -1484,11 +1496,12 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
frequency = 1
}
if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
rollbackErr = err
// If some headers were inserted, add them too to the rollback list
if n > 0 {
rollback = append(rollback, chunk[:n]...)
}
log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "parent", chunk[n].ParentHash, "err", err)
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
// All verifications passed, store newly found uncertain headers
Expand All @@ -1503,14 +1516,15 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
select {
case <-d.cancelCh:
rollbackErr = errCanceled
return errCanceled
case <-time.After(time.Second):
}
}
// Otherwise insert the headers for content retrieval
inserts := d.queue.Schedule(chunk, origin)
if len(inserts) != len(chunk) {
log.Debug("Stale headers")
rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunk))
return fmt.Errorf("%w: stale headers", errBadPeer)
}
}
Expand Down Expand Up @@ -1680,6 +1694,14 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
}

func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) {
if len(results) == 0 {
return nil, nil, nil
}
if lastNum := results[len(results)-1].Header.Number.Uint64(); lastNum < pivot {
// the pivot is somewhere in the future
return nil, results, nil
}
// This can also be optimized, but only happens very seldom
for _, result := range results {
num := result.Header.Number.Uint64()
switch {
Expand Down
Loading