Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[zurich] Ghost-128 patch: downloader sync periodic TD check #544

Merged
merged 3 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 78 additions & 5 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ var (
fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download
fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in snap sync

maxTotalDifficultyDistance = 10 // Maximum amount of block difficulty units the master peer can lag behind w.r.t. other peers
totalDifficultyContCheck = 13 * time.Second // Time interval to wait between total difficulty checks
)

var (
Expand Down Expand Up @@ -129,6 +132,7 @@ type Downloader struct {

// Channels
headerProcCh chan *headerTask // Channel to feed the header processor new tasks
totalDiffCh chan struct{} // Channel to notify the total difficulty checker about end of header processing

// Skeleton sync
skeleton *skeleton // Header skeleton to backfill the chain with (eth2 mode)
Expand All @@ -140,6 +144,9 @@ type Downloader struct {
SnapSyncer *snap.Syncer // TODO(karalabe): make private! hack for now
stateSyncStart chan *stateSync

// Sync target total difficulty
td *big.Int

// Cancellation and termination
cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop)
cancelCh chan struct{} // Channel to cancel mid-flight syncs
Expand Down Expand Up @@ -233,6 +240,7 @@ func New(checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain Bl
lightchain: lightchain,
dropPeer: dropPeer,
headerProcCh: make(chan *headerTask, 1),
totalDiffCh: make(chan struct{}),
quitCh: make(chan struct{}),
SnapSyncer: snap.NewSyncer(stateDb, chain.TrieDB().Scheme()),
stateSyncStart: make(chan *stateSync),
Expand Down Expand Up @@ -419,6 +427,10 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td, ttd *big.Int,
default:
}
}
select {
case <-d.totalDiffCh:
default:
}
for empty := false; !empty; {
select {
case <-d.headerProcCh:
Expand All @@ -430,6 +442,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td, ttd *big.Int,
d.cancelLock.Lock()
d.cancelCh = make(chan struct{})
d.cancelPeer = id
d.td = td
d.cancelLock.Unlock()

defer d.Cancel() // No matter what, we can't leave the cancel channel open
Expand Down Expand Up @@ -637,7 +650,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
headerFetcher, // Headers are always retrieved
func() error { return d.fetchBodies(origin+1, beaconMode) }, // Bodies are retrieved during normal and snap sync
func() error { return d.fetchReceipts(origin+1, beaconMode) }, // Receipts are retrieved during snap sync
func() error { return d.processHeaders(origin+1, td, ttd, beaconMode) },
func() error { return d.processHeaders(origin+1, ttd, beaconMode) },
}
if mode == SnapSync {
d.pivotLock.Lock()
Expand All @@ -648,6 +661,8 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
} else if mode == FullSync {
fetchers = append(fetchers, func() error { return d.processFullSyncContent(ttd, beaconMode) })
}
fetchers = append(fetchers, func() error { return d.fetchTotalDifficulty(p, latest) })

return d.spawnSync(fetchers)
}

Expand Down Expand Up @@ -729,7 +744,8 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *ty
mode := d.getMode()

// Request the advertised remote head block and wait for the response
latest, _ := p.peer.Head()
latest, peerTd, _ := p.peer.Head()
d.td = peerTd
fetch := 1
if mode == SnapSync {
fetch = 2 // head + pivot headers
Expand Down Expand Up @@ -1013,6 +1029,59 @@ func (d *Downloader) findAncestorBinarySearch(p *peerConnection, mode SyncMode,
return start, nil
}

func (d *Downloader) fetchTotalDifficulty(p *peerConnection, latest *types.Header) error {
var head *types.Header
head = latest

for {
select {
case <-d.cancelCh:
return errCanceled
case <-d.totalDiffCh:
return nil
case <-time.After(totalDifficultyContCheck):

// Check if the peer is lagging behind
for _, peer := range d.peers.AllPeers() {
_, peerTd, peerDifficulty := peer.peer.Head()
distance := new(big.Int).Sub(peerTd, d.td)
if distance.Sign() <= 0 || peerDifficulty.Sign() == 0 {
continue
}

threshold := big.NewInt(int64(maxTotalDifficultyDistance))
threshold.Mul(threshold, peerDifficulty)
if distance.Cmp(threshold) > 0 {
log.Warn("Found significantly higher total difficulty", "td", d.td, "better", peerTd)
return fmt.Errorf("%w: other peers have significantly heavier chains", errUnsyncedPeer)
}
}

// Check for chain progress on the peer's side
headers, _, err := d.fetchHeadersByNumber(p, head.Number.Uint64()+1, 8, 0, false)
if err != nil {
return fmt.Errorf("%w: header request failed: %v", errBadPeer, err)
}
ignore := reorgProtHeaderDelay
if ignore > len(headers) {
ignore = len(headers)
}
headers = headers[:len(headers)-ignore]
newTd := new(big.Int).Set(d.td)
for _, header := range headers {
newTd.Add(newTd, header.Difficulty)
}

if newTd.Cmp(d.td) > 0 {
head = headers[len(headers)-1]
p.peer.SetHead(head.Hash(), newTd, head.Difficulty)
log.Debug("Updating sync target total difficulty", "old", d.td, "new", newTd)
d.td = newTd
}
}
}
}

// fetchHeaders keeps retrieving headers concurrently from the number
// requested, until no more are returned, potentially throttling on the way. To
// facilitate concurrency but still protect against malicious nodes sending bad
Expand Down Expand Up @@ -1284,7 +1353,7 @@ func (d *Downloader) fetchReceipts(from uint64, beaconMode bool) error {
// processHeaders takes batches of retrieved headers from an input channel and
// keeps processing and scheduling them into the header chain and downloader's
// queue until the stream ends or a failure occurs.
func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode bool) error {
func (d *Downloader) processHeaders(origin uint64, ttd *big.Int, beaconMode bool) error {
// Keep a count of uncertain headers to roll back
var (
rollback uint64 // Zero means no rollback (fine as you can't unroll the genesis)
Expand Down Expand Up @@ -1332,6 +1401,10 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
case <-d.cancelCh:
}
}
select {
case d.totalDiffCh <- struct{}{}:
case <-d.cancelCh:
}
// If we're in legacy sync mode, we need to check total difficulty
// violations from malicious peers. That is not needed in beacon
// mode and we can skip to terminating sync.
Expand All @@ -1350,7 +1423,7 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
// R: Nothing to give
if mode != LightSync {
head := d.blockchain.CurrentBlock()
if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
if !gotHeaders && d.td.Cmp(d.blockchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
return errStallingPeer
}
}
Expand All @@ -1363,7 +1436,7 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
// peer gave us something useful, we're already happy/progressed (above check).
if mode == SnapSync || mode == LightSync {
head := d.lightchain.CurrentHeader()
if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
if d.td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
return errStallingPeer
}
}
Expand Down
21 changes: 17 additions & 4 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,24 @@ type downloadTesterPeer struct {
chain *core.BlockChain

withholdHeaders map[common.Hash]struct{}
fakeTD *big.Int
}

// Head constructs a function to retrieve a peer's current head hash
// and total difficulty.
func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) {
func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int, *big.Int) {
head := dlp.chain.CurrentBlock()
return head.Hash(), dlp.chain.GetTd(head.Hash(), head.Number.Uint64())
td := dlp.chain.GetTd(head.Hash(), head.Number.Uint64())
if dlp.fakeTD != nil {
td.Set(dlp.fakeTD)
}
return head.Hash(), td, new(big.Int).Set(dlp.chain.CurrentBlock().Difficulty)
}

// SetHead constructs a function to retrieve a peer's current head hash
// and total difficulty.
func (dlp *downloadTesterPeer) SetHead(common.Hash, *big.Int, *big.Int) {
// noop
}

func unmarshalRlpHeaders(rlpdata []rlp.RawValue) []*types.Header {
Expand Down Expand Up @@ -986,8 +997,10 @@ func testHighTDStarvationAttack(t *testing.T, protocol uint, mode SyncMode) {
defer tester.terminate()

chain := testChainBase.shorten(1)
tester.newPeer("attack", protocol, chain.blocks[1:])
if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer {
dlp := tester.newPeer("attack", protocol, chain.blocks[1:])
fakeTD := big.NewInt(1000000)
dlp.fakeTD = fakeTD
meowsbits marked this conversation as resolved.
Show resolved Hide resolved
if err := tester.sync("attack", fakeTD, mode); err != errStallingPeer {
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer)
}
}
Expand Down
8 changes: 6 additions & 2 deletions eth/downloader/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ type peerConnection struct {

// LightPeer encapsulates the methods required to synchronise with a remote light peer.
type LightPeer interface {
Head() (common.Hash, *big.Int)
Head() (common.Hash, *big.Int, *big.Int)
SetHead(common.Hash, *big.Int, *big.Int)
RequestHeadersByHash(common.Hash, int, int, bool, chan *eth.Response) (*eth.Request, error)
RequestHeadersByNumber(uint64, int, int, bool, chan *eth.Response) (*eth.Request, error)
}
Expand All @@ -74,7 +75,10 @@ type lightPeerWrapper struct {
peer LightPeer
}

func (w *lightPeerWrapper) Head() (common.Hash, *big.Int) { return w.peer.Head() }
func (w *lightPeerWrapper) Head() (common.Hash, *big.Int, *big.Int) { return w.peer.Head() }
func (w *lightPeerWrapper) SetHead(head common.Hash, td *big.Int, blockDifficulty *big.Int) {
w.peer.SetHead(head, td, blockDifficulty)
}
func (w *lightPeerWrapper) RequestHeadersByHash(h common.Hash, amount int, skip int, reverse bool, sink chan *eth.Response) (*eth.Request, error) {
return w.peer.RequestHeadersByHash(h, amount, skip, reverse, sink)
}
Expand Down
8 changes: 6 additions & 2 deletions eth/downloader/skeleton_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,11 @@ func (p *skeletonTestPeer) RequestHeadersByNumber(origin uint64, amount int, ski
return req, nil
}

func (p *skeletonTestPeer) Head() (common.Hash, *big.Int) {
func (p *skeletonTestPeer) Head() (common.Hash, *big.Int, *big.Int) {
panic("skeleton sync must not request the remote head")
}

func (p *skeletonTestPeer) SetHead(common.Hash, *big.Int, *big.Int) {
panic("skeleton sync must not request the remote head")
}

Expand Down Expand Up @@ -514,7 +518,7 @@ func TestSkeletonSyncExtend(t *testing.T) {
// Tests that the skeleton sync correctly retrieves headers from one or more
// peers without duplicates or other strange side effects.
func TestSkeletonSyncRetrievals(t *testing.T) {
//log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
// log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))

// Since skeleton headers don't need to be meaningful, beyond a parent hash
// progression, create a long fake chain to test with.
Expand Down
4 changes: 2 additions & 2 deletions eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td
trueTD = new(big.Int).Sub(td, block.Difficulty())
)
// Update the peer's total difficulty if better than the previous
if _, td := peer.Head(); trueTD.Cmp(td) > 0 {
peer.SetHead(trueHead, trueTD)
if _, td, _ := peer.Head(); trueTD.Cmp(td) > 0 {
peer.SetHead(trueHead, trueTD, block.Difficulty())
h.chainSync.handlePeerEvent(peer)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type ethPeer struct {

// info gathers and returns some `eth` protocol metadata known about a peer.
func (p *ethPeer) info() *ethPeerInfo {
hash, td := p.Head()
hash, td, _ := p.Head()

info := &ethPeerInfo{
Version: p.Version(),
Expand Down
4 changes: 2 additions & 2 deletions eth/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (ps *peerSet) peerWithHighestTD() *eth.Peer {
bestTd *big.Int
)
for _, p := range ps.peers {
if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 {
if _, td, _ := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 {
bestPeer, bestTd = p.Peer, td
}
}
Expand Down Expand Up @@ -268,7 +268,7 @@ func (ps *peerSet) WorstPeer() *ethPeer {
worstTD *big.Int
)
for _, p := range ps.peers {
if _, td := p.Head(); worstPeer == nil || td.Cmp(worstTD) < 0 {
if _, td, _ := p.Head(); worstPeer == nil || td.Cmp(worstTD) < 0 {
worstPeer, worstTD = p, td
}
}
Expand Down
15 changes: 9 additions & 6 deletions eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ type Peer struct {
rw p2p.MsgReadWriter // Input/output streams for snap
version uint // Protocol version negotiated

head common.Hash // Latest advertised head block hash
td *big.Int // Latest advertised head block total difficulty
forkid forkid.ID // Advertised forkid at time of handshake
head common.Hash // Latest advertised head block hash
td *big.Int // Latest advertised head block total difficulty
forkid forkid.ID // Advertised forkid at time of handshake
blockDifficulty *big.Int // Latest advertised head block difficulty

knownBlocks *knownCache // Set of block hashes known to be known by this peer
queuedBlocks chan *blockPropagation // Queue of blocks to broadcast to the peer
Expand Down Expand Up @@ -113,6 +114,7 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe
resDispatch: make(chan *response),
txpool: txpool,
term: make(chan struct{}),
blockDifficulty: big.NewInt(0),
}
// Start up all the broadcasters
go peer.broadcastBlocks()
Expand Down Expand Up @@ -141,21 +143,22 @@ func (p *Peer) Version() uint {
}

// Head retrieves the current head hash and total difficulty of the peer.
func (p *Peer) Head() (hash common.Hash, td *big.Int) {
func (p *Peer) Head() (hash common.Hash, td, blockDifficulty *big.Int) {
p.lock.RLock()
defer p.lock.RUnlock()

copy(hash[:], p.head[:])
return hash, new(big.Int).Set(p.td)
return hash, new(big.Int).Set(p.td), new(big.Int).Set(p.blockDifficulty)
}

// SetHead updates the head hash and total difficulty of the peer.
func (p *Peer) SetHead(hash common.Hash, td *big.Int) {
func (p *Peer) SetHead(hash common.Hash, td, blockDifficulty *big.Int) {
p.lock.Lock()
defer p.lock.Unlock()

copy(p.head[:], hash[:])
p.td.Set(td)
p.blockDifficulty.Set(blockDifficulty)
}

// ForkID retrieves the reported forkid at the time of handshake.
Expand Down
2 changes: 1 addition & 1 deletion eth/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (cs *chainSyncer) nextSyncOp() *chainSyncOp {
}

func peerToSyncOp(mode downloader.SyncMode, p *eth.Peer) *chainSyncOp {
peerHead, peerTD := p.Head()
peerHead, peerTD, _ := p.Head()
return &chainSyncOp{mode: mode, peer: p, td: peerTD, head: peerHead}
}

Expand Down