Skip to content

Commit a2be5f1

Browse files
committed
In this chage, fixes to peer selector and the test.
peerSelector.go: various bug fixes - introduce peerSelectorPeer to wrap the network.Peer and add peerClass information, to be able to distinguish between peers of the same address but different classes. - keep track of download failures to be able to increase the cost of each failure when failing more than succeeding. This is to evict the peer faster when constantly failing to download. - initialize rankSum and rankSamples to initialRank of the class. Otherwise, the peer rank will have a very long warmup time before relfecting the correct rank. - let resetRequestPenalty bound the rank within the class bounds. Otherwise, the penalty calculation pushes the rank out of the class bounds (bug). - getNextPeer is local to the package - getNextPeer, PeerDownloadDurationToRank and RankPeer use peerSelectorPeer instead of network.Peer - refreshAvailablePeers distinguishes between peers with the same address but of different peer class - findPeer returns the peer given the address and the peer class (instead of just the address) catchpointCatchup_test.go: - Remove comment about giving the second node all the stake, since it is not the case here. - Use the round from the catchpoint instead of guessing the round as 36. In case the following catchpoint was obtained due to race conditions, checking for round 37 will be trivial, since it will also be obtained from the catchpoint. catchpointService.go and service.go: - Update the code to use peerSelectorPeer instead of network.Peer with peerSelector peerSelector_test.go: - Update the tests to use peerSelectorPeer instead of network.Peer with peerSelector - Cleanup debugging printouts.
1 parent a9705a1 commit a2be5f1

File tree

5 files changed

+157
-141
lines changed

5 files changed

+157
-141
lines changed

catchup/catchpointService.go

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,8 @@ func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) {
281281
}
282282
return cs.abort(fmt.Errorf("processStageLedgerDownload failed to reset staging balances : %v", err))
283283
}
284-
peer, err := peerSelector.GetNextPeer()
284+
psp, err := peerSelector.getNextPeer()
285+
peer := psp.Peer
285286
if err != nil {
286287
err = fmt.Errorf("processStageLedgerDownload: catchpoint catchup was unable to obtain a list of peers to retrieve the catchpoint file from")
287288
return cs.abort(err)
@@ -293,9 +294,9 @@ func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) {
293294
break
294295
}
295296
// failed to build the merkle trie for the above catchpoint file.
296-
peerSelector.RankPeer(peer, peerRankInvalidDownload)
297+
peerSelector.RankPeer(psp, peerRankInvalidDownload)
297298
} else {
298-
peerSelector.RankPeer(peer, peerRankDownloadFailed)
299+
peerSelector.RankPeer(psp, peerRankDownloadFailed)
299300
}
300301

301302
// instead of testing for err == cs.ctx.Err() , we'll check on the context itself.
@@ -345,11 +346,11 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro
345346
for {
346347
attemptsCount++
347348

348-
peer := network.Peer(0)
349+
var psp *peerSelectorPeer
349350
blockDownloadDuration := time.Duration(0)
350351
if blk == nil {
351352
var stop bool
352-
blk, blockDownloadDuration, peer, stop, err = cs.fetchBlock(blockRound, uint64(attemptsCount))
353+
blk, blockDownloadDuration, psp, stop, err = cs.fetchBlock(blockRound, uint64(attemptsCount))
353354
if stop {
354355
return err
355356
} else if blk == nil {
@@ -364,7 +365,7 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro
364365
if attemptsCount <= cs.config.CatchupBlockDownloadRetryAttempts {
365366
// try again.
366367
blk = nil
367-
cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload)
368+
cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload)
368369
continue
369370
}
370371
return cs.abort(fmt.Errorf("processStageLastestBlockDownload: unsupported protocol version detected: '%v'", blk.BlockHeader.CurrentProtocol))
@@ -376,7 +377,7 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro
376377
if attemptsCount <= cs.config.CatchupBlockDownloadRetryAttempts {
377378
// try again.
378379
blk = nil
379-
cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload)
380+
cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload)
380381
continue
381382
}
382383
return cs.abort(fmt.Errorf("processStageLastestBlockDownload: genesis hash mismatches : genesis hash on genesis.json file is %v while genesis hash of downloaded block is %v", cs.ledger.GenesisHash(), blk.GenesisHash()))
@@ -389,7 +390,7 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro
389390
if attemptsCount <= cs.config.CatchupBlockDownloadRetryAttempts {
390391
// try again.
391392
blk = nil
392-
cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload)
393+
cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload)
393394
continue
394395
}
395396
return cs.abort(fmt.Errorf("processStageLastestBlockDownload: downloaded block content does not match downloaded block header"))
@@ -405,14 +406,14 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro
405406
// try again.
406407
blk = nil
407408
cs.log.Infof("processStageLastestBlockDownload: block %d verification against catchpoint failed, another attempt will be made; err = %v", blockRound, err)
408-
cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload)
409+
cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload)
409410
continue
410411
}
411412
return cs.abort(fmt.Errorf("processStageLastestBlockDownload failed when calling VerifyCatchpoint : %v", err))
412413
}
413414
// give a rank to the download, as the download was successful.
414-
peerRank := cs.blocksDownloadPeerSelector.PeerDownloadDurationToRank(peer, blockDownloadDuration)
415-
cs.blocksDownloadPeerSelector.RankPeer(peer, peerRank)
415+
peerRank := cs.blocksDownloadPeerSelector.PeerDownloadDurationToRank(psp, blockDownloadDuration)
416+
cs.blocksDownloadPeerSelector.RankPeer(psp, peerRank)
416417

417418
err = cs.ledgerAccessor.StoreBalancesRound(cs.ctx, blk)
418419
if err != nil {
@@ -495,11 +496,11 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
495496
}
496497
}
497498

498-
peer := network.Peer(0)
499+
var psp *peerSelectorPeer
499500
blockDownloadDuration := time.Duration(0)
500501
if blk == nil {
501502
var stop bool
502-
blk, blockDownloadDuration, peer, stop, err = cs.fetchBlock(topBlock.Round()-basics.Round(blocksFetched), retryCount)
503+
blk, blockDownloadDuration, psp, stop, err = cs.fetchBlock(topBlock.Round()-basics.Round(blocksFetched), retryCount)
503504
if stop {
504505
return err
505506
} else if blk == nil {
@@ -515,7 +516,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
515516
// not identical, retry download.
516517
cs.log.Warnf("processStageBlocksDownload downloaded block(%d) did not match it's successor(%d) block hash %v != %v", blk.Round(), prevBlock.Round(), blk.Hash(), prevBlock.BlockHeader.Branch)
517518
cs.updateBlockRetrievalStatistics(-1, 0)
518-
cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload)
519+
cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload)
519520
if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) {
520521
// try again.
521522
retryCount++
@@ -528,7 +529,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
528529
if _, ok := config.Consensus[blk.BlockHeader.CurrentProtocol]; !ok {
529530
cs.log.Warnf("processStageBlocksDownload: unsupported protocol version detected: '%v'", blk.BlockHeader.CurrentProtocol)
530531
cs.updateBlockRetrievalStatistics(-1, 0)
531-
cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload)
532+
cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload)
532533
if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) {
533534
// try again.
534535
retryCount++
@@ -541,7 +542,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
541542
if !blk.ContentsMatchHeader() {
542543
cs.log.Warnf("processStageBlocksDownload: downloaded block content does not match downloaded block header")
543544
// try again.
544-
cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload)
545+
cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload)
545546
cs.updateBlockRetrievalStatistics(-1, 0)
546547
if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) {
547548
// try again.
@@ -552,8 +553,8 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
552553
}
553554

554555
cs.updateBlockRetrievalStatistics(0, 1)
555-
peerRank := cs.blocksDownloadPeerSelector.PeerDownloadDurationToRank(peer, blockDownloadDuration)
556-
cs.blocksDownloadPeerSelector.RankPeer(peer, peerRank)
556+
peerRank := cs.blocksDownloadPeerSelector.PeerDownloadDurationToRank(psp, blockDownloadDuration)
557+
cs.blocksDownloadPeerSelector.RankPeer(psp, peerRank)
557558

558559
// all good, persist and move on.
559560
err = cs.ledgerAccessor.StoreBlock(cs.ctx, blk)
@@ -581,39 +582,40 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
581582
// fetchBlock uses the internal peer selector blocksDownloadPeerSelector to pick a peer and then attempt to fetch the block requested from that peer.
582583
// The method return stop=true if the caller should exit the current operation
583584
// If the method return a nil block, the caller is expected to retry the operation, increasing the retry counter as needed.
584-
func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount uint64) (blk *bookkeeping.Block, downloadDuration time.Duration, peer network.Peer, stop bool, err error) {
585-
peer, err = cs.blocksDownloadPeerSelector.GetNextPeer()
585+
func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount uint64) (blk *bookkeeping.Block, downloadDuration time.Duration, psp *peerSelectorPeer, stop bool, err error) {
586+
psp, err = cs.blocksDownloadPeerSelector.getNextPeer()
587+
peer := psp.Peer
586588
if err != nil {
587589
err = fmt.Errorf("fetchBlock: unable to obtain a list of peers to retrieve the latest block from")
588-
return nil, time.Duration(0), peer, true, cs.abort(err)
590+
return nil, time.Duration(0), psp, true, cs.abort(err)
589591
}
590592

591593
httpPeer, validPeer := peer.(network.HTTPPeer)
592594
if !validPeer {
593595
cs.log.Warnf("fetchBlock: non-HTTP peer was provided by the peer selector")
594-
cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankInvalidDownload)
596+
cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankInvalidDownload)
595597
if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) {
596598
// try again.
597-
return nil, time.Duration(0), peer, false, nil
599+
return nil, time.Duration(0), psp, false, nil
598600
}
599-
return nil, time.Duration(0), peer, true, cs.abort(fmt.Errorf("fetchBlock: recurring non-HTTP peer was provided by the peer selector"))
601+
return nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock: recurring non-HTTP peer was provided by the peer selector"))
600602
}
601603
fetcher := makeUniversalBlockFetcher(cs.log, cs.net, cs.config)
602604
blk, _, downloadDuration, err = fetcher.fetchBlock(cs.ctx, round, httpPeer)
603605
if err != nil {
604606
if cs.ctx.Err() != nil {
605-
return nil, time.Duration(0), peer, true, cs.stopOrAbort()
607+
return nil, time.Duration(0), psp, true, cs.stopOrAbort()
606608
}
607609
if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) {
608610
// try again.
609611
cs.log.Infof("Failed to download block %d on attempt %d out of %d. %v", round, retryCount, cs.config.CatchupBlockDownloadRetryAttempts, err)
610-
cs.blocksDownloadPeerSelector.RankPeer(peer, peerRankDownloadFailed)
611-
return nil, time.Duration(0), peer, false, nil
612+
cs.blocksDownloadPeerSelector.RankPeer(psp, peerRankDownloadFailed)
613+
return nil, time.Duration(0), psp, false, nil
612614
}
613-
return nil, time.Duration(0), peer, true, cs.abort(fmt.Errorf("fetchBlock failed after multiple blocks download attempts"))
615+
return nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock failed after multiple blocks download attempts"))
614616
}
615617
// success
616-
return blk, downloadDuration, peer, false, nil
618+
return blk, downloadDuration, psp, false, nil
617619
}
618620

619621
// processStageLedgerDownload is the fifth catchpoint catchup stage. It completes the catchup process, swap the new tables and restart the node functionality.

0 commit comments

Comments
 (0)