Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth, les, light: enforce CHT checkpoints on fast-sync too #19468

Merged
merged 1 commit into from
Apr 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/geth/chaincmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func copyDb(ctx *cli.Context) error {

chain, chainDb := utils.MakeChain(ctx, stack)
syncmode := *utils.GlobalTextMarshaler(ctx, utils.SyncModeFlag.Name).(*downloader.SyncMode)
dl := downloader.New(syncmode, chainDb, new(event.TypeMux), chain, nil, nil)
dl := downloader.New(syncmode, 0, chainDb, new(event.TypeMux), chain, nil, nil)

// Create a source peer to satisfy downloader requests from
db, err := rawdb.NewLevelDBDatabase(ctx.Args().First(), ctx.GlobalInt(utils.CacheFlag.Name), 256, "")
Expand Down
20 changes: 13 additions & 7 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ var (
errUnknownPeer = errors.New("peer is unknown or unhealthy")
errBadPeer = errors.New("action from bad peer ignored")
errStallingPeer = errors.New("peer is stalling")
errUnsyncedPeer = errors.New("unsynced peer")
errNoPeers = errors.New("no peers to keep download active")
errTimeout = errors.New("timeout")
errEmptyHeaderSet = errors.New("empty header set by peer")
Expand All @@ -99,10 +100,11 @@ type Downloader struct {
mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
mux *event.TypeMux // Event multiplexer to announce sync operation events

genesis uint64 // Genesis block number to limit sync to (e.g. light client CHT)
queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed
stateDB ethdb.Database
checkpoint uint64 // Checkpoint block number to enforce head against (e.g. fast sync)
genesis uint64 // Genesis block number to limit sync to (e.g. light client CHT)
queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed
stateDB ethdb.Database

rttEstimate uint64 // Round trip time to target for download requests
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
Expand Down Expand Up @@ -205,15 +207,15 @@ type BlockChain interface {
}

// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
func New(mode SyncMode, checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
if lightchain == nil {
lightchain = chain
}

dl := &Downloader{
mode: mode,
stateDB: stateDb,
mux: mux,
checkpoint: checkpoint,
queue: newQueue(),
peers: newPeerSet(),
rttEstimate: uint64(rttMaxEstimate),
Expand Down Expand Up @@ -326,7 +328,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
case nil:
case errBusy:

case errTimeout, errBadPeer, errStallingPeer,
case errTimeout, errBadPeer, errStallingPeer, errUnsyncedPeer,
errEmptyHeaderSet, errPeersUnavailable, errTooOld,
errInvalidAncestor, errInvalidChain:
log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
Expand Down Expand Up @@ -578,6 +580,10 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
return nil, errBadPeer
}
head := headers[0]
if d.mode == FastSync && head.Number.Uint64() < d.checkpoint {
p.log.Warn("Remote head below checkpoint", "number", head.Number, "hash", head.Hash())
return nil, errUnsyncedPeer
}
p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash())
return head, nil

Expand Down
41 changes: 39 additions & 2 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"testing"
"time"

ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -75,7 +75,7 @@ func newTester() *downloadTester {
tester.stateDb = rawdb.NewMemoryDatabase()
tester.stateDb.Put(testGenesis.Root().Bytes(), []byte{0x00})

tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer)
tester.downloader = New(FullSync, 0, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer)
return tester
}

Expand Down Expand Up @@ -1051,6 +1051,7 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
{errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop
{errBadPeer, true}, // Peer was deemed bad for some reason, drop it
{errStallingPeer, true}, // Peer was detected to be stalling, drop it
{errUnsyncedPeer, true}, // Peer was detected to be unsynced, drop it
{errNoPeers, false}, // No peers to download from, soft race, no issue
{errTimeout, true}, // No hashes received in due time, drop the peer
{errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end
Expand Down Expand Up @@ -1569,3 +1570,39 @@ func TestRemoteHeaderRequestSpan(t *testing.T) {
}
}
}

// Tests that peers below a pre-configured checkpoint block are prevented from
// being fast-synced from, avoiding potential cheap eclipse attacks.
func TestCheckpointEnforcement62(t *testing.T) { testCheckpointEnforcement(t, 62, FullSync) }
func TestCheckpointEnforcement63Full(t *testing.T) { testCheckpointEnforcement(t, 63, FullSync) }
func TestCheckpointEnforcement63Fast(t *testing.T) { testCheckpointEnforcement(t, 63, FastSync) }
func TestCheckpointEnforcement64Full(t *testing.T) { testCheckpointEnforcement(t, 64, FullSync) }
func TestCheckpointEnforcement64Fast(t *testing.T) { testCheckpointEnforcement(t, 64, FastSync) }
func TestCheckpointEnforcement64Light(t *testing.T) { testCheckpointEnforcement(t, 64, LightSync) }

func testCheckpointEnforcement(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()

// Create a new tester with a particular hard coded checkpoint block
tester := newTester()
defer tester.terminate()

tester.downloader.checkpoint = uint64(fsMinFullBlocks) + 256
chain := testChainBase.shorten(int(tester.downloader.checkpoint) - 1)

// Attempt to sync with the peer and validate the result
tester.newPeer("peer", protocol, chain)

var expect error
if mode == FastSync {
expect = errUnsyncedPeer
}
if err := tester.sync("peer", nil, mode); err != expect {
t.Fatalf("block sync error mismatch: have %v, want %v", err, expect)
}
if mode == FastSync {
assertOwnChain(t, tester, 1)
} else {
assertOwnChain(t, tester, chain.len())
}
}
77 changes: 38 additions & 39 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/misc"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader"
Expand All @@ -55,7 +54,7 @@ const (
)

var (
daoChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the DAO handshake challenge
syncChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the sync progress challenge
)

// errIncompatibleConfig is returned if the requested protocols and configs are
Expand All @@ -72,6 +71,9 @@ type ProtocolManager struct {
fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)

checkpointNumber uint64 // Block number for the sync progress validator to cross reference
checkpointHash common.Hash // Block hash for the sync progress validator to cross reference

txpool txPool
blockchain *core.BlockChain
chainconfig *params.ChainConfig
Expand Down Expand Up @@ -126,6 +128,11 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
if mode == downloader.FastSync {
manager.fastSync = uint32(1)
}
// If we have trusted checkpoints, enforce them on the chain
if checkpoint, ok := params.TrustedCheckpoints[blockchain.Genesis().Hash()]; ok {
manager.checkpointNumber = (checkpoint.SectionIndex+1)*params.CHTFrequency - 1
manager.checkpointHash = checkpoint.SectionHead
}
// Initiate a sub-protocol for every implemented version we can handle
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
for i, version := range ProtocolVersions {
Expand Down Expand Up @@ -165,7 +172,7 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
return nil, errIncompatibleConfig
}
// Construct the different synchronisation mechanisms
manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
manager.downloader = downloader.New(mode, manager.checkpointNumber, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)

validator := func(header *types.Header) error {
return engine.VerifyHeader(blockchain, header, true)
Expand Down Expand Up @@ -291,22 +298,22 @@ func (pm *ProtocolManager) handle(p *peer) error {
// after this will be sent via broadcasts.
pm.syncTransactions(p)

// If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
// Request the peer's DAO fork header for extra-data validation
if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
// If we have a trusted CHT, reject all peers below that (avoid fast sync eclipse)
if pm.checkpointHash != (common.Hash{}) {
// Request the peer's checkpoint header for chain height/weight validation
if err := p.RequestHeadersByNumber(pm.checkpointNumber, 1, 0, false); err != nil {
return err
}
// Start a timer to disconnect if the peer doesn't reply in time
p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
p.Log().Debug("Timed out DAO fork-check, dropping")
p.syncDrop = time.AfterFunc(syncChallengeTimeout, func() {
p.Log().Warn("Checkpoint challenge timed out, dropping", "addr", p.RemoteAddr(), "type", p.Name())
pm.removePeer(p.id)
})
// Make sure it's cleaned up if the peer dies off
defer func() {
if p.forkDrop != nil {
p.forkDrop.Stop()
p.forkDrop = nil
if p.syncDrop != nil {
p.syncDrop.Stop()
p.syncDrop = nil
}
}()
}
Expand Down Expand Up @@ -438,41 +445,33 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&headers); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// If no headers were received, but we're expending a DAO fork check, maybe it's that
if len(headers) == 0 && p.forkDrop != nil {
// Possibly an empty reply to the fork header checks, sanity check TDs
verifyDAO := true

// If we already have a DAO header, we can check the peer's TD against it. If
// the peer's ahead of this, it too must have a reply to the DAO check
if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
verifyDAO = false
}
}
// If we're seemingly on the same chain, disable the drop timer
if verifyDAO {
p.Log().Debug("Seems to be on the same side of the DAO fork")
p.forkDrop.Stop()
p.forkDrop = nil
return nil
// If no headers were received, but we're expencting a checkpoint header, consider it that
if len(headers) == 0 && p.syncDrop != nil {
// Stop the timer either way, decide later to drop or not
p.syncDrop.Stop()
p.syncDrop = nil

// If we're doing a fast sync, we must enforce the checkpoint block to avoid
// eclipse attacks. Unsynced nodes are welcome to connect after we're done
// joining the network
if atomic.LoadUint32(&pm.fastSync) == 1 {
p.Log().Warn("Dropping unsynced node during fast sync", "addr", p.RemoteAddr(), "type", p.Name())
return errors.New("unsynced node cannot serve fast sync")
}
}
// Filter out any explicitly requested headers, deliver the rest to the downloader
filter := len(headers) == 1
if filter {
// If it's a potential DAO fork check, validate against the rules
if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
// Disable the fork drop timer
p.forkDrop.Stop()
p.forkDrop = nil
// If it's a potential sync progress check, validate the content and advertised chain weight
if p.syncDrop != nil && headers[0].Number.Uint64() == pm.checkpointNumber {
// Disable the sync drop timer
p.syncDrop.Stop()
p.syncDrop = nil

// Validate the header and either drop the peer or continue
if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {
p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")
return err
if headers[0].Hash() != pm.checkpointHash {
return errors.New("checkpoint hash mismatch")
}
p.Log().Debug("Verified to be on the same side of the DAO fork")
return nil
}
// Otherwise if it's a whitelisted block, validate against the set
Expand Down
Loading