Skip to content

Commit

Permalink
eth: support bubbling up bad blocks from sync to the engine API
Browse files Browse the repository at this point in the history
  • Loading branch information
karalabe committed Jun 28, 2022
1 parent c7f485d commit 355f049
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 13 deletions.
2 changes: 1 addition & 1 deletion core/beacon/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type payloadAttributesMarshaling struct {

//go:generate go run github.com/fjl/gencodec -type ExecutableDataV1 -field-override executableDataMarshaling -out gen_ed.go

// ExecutableDataV1 structure described at https://github.com/ethereum/execution-apis/src/engine/specification.md
// ExecutableDataV1 structure described at https://github.com/ethereum/execution-apis/tree/main/src/engine/specification.md
type ExecutableDataV1 struct {
ParentHash common.Hash `json:"parentHash" gencodec:"required"`
FeeRecipient common.Address `json:"feeRecipient" gencodec:"required"`
Expand Down
130 changes: 119 additions & 11 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,47 @@ func Register(stack *node.Node, backend *eth.Ethereum) error {
return nil
}

const (
// invalidBlockHitEviction is the number of times an invalid block can be
// referenced in forkchoice update or new payload before it is attempted
// to be reprocessed again.
invalidBlockHitEviction = 128

// invalidTipsetsCap is the max number of recent block hashes tracked that
// have lead to some bad ancestor block. It's just an OOM protection.
invalidTipsetsCap = 512
)

type ConsensusAPI struct {
eth *eth.Ethereum
eth *eth.Ethereum

remoteBlocks *headerQueue // Cache of remote payloads received
localBlocks *payloadQueue // Cache of local payloads generated
// Lock for the forkChoiceUpdated method
forkChoiceLock sync.Mutex

// The forkchoice update and new payload method require us to return the
// latest valid hash in an invalid chain. To support that return, we need
// to track historical bad blocks as well as bad tipsets in case a chain
// is constantly built on it.
//
// There are a few important caveats in theis mechanism:
// - The bad block tracking is ephemeral, in-memory only. We must never
// persist any bad block informaion to disk as a bug in Geth could end
// up blocking a valid chain, even if a later Geth update would accept
// it.
// - Bad blocks will get forgotten after a certain threshold of import
// attempts and will be retried. The rationale is that if the network
// really-really-really tries to feed us a block, we should give it a
// new chance, perhaps us being racey instead of the block being legit
// bad (this happened in Geth at a point with import vs. pending race).
// - Tracking all the blocks built on top of the bad one could be a bit
// problematic, so we will only track the head chain segment of a bad
// chain to allow discarding progressing bad chains and side chains,
// without tracking too much bad data.
invalidBlocksHits map[common.Hash]int // Emhemeral cache to track invalid blocks and their hit count
invalidTipsets map[common.Hash]*types.Header // Ephemeral cache to track invalid tipsets and their bad ancestor
invalidLock sync.Mutex // Protects the invalid maps from concurrent access

forkChoiceLock sync.Mutex // Lock for the forkChoiceUpdated method
}

// NewConsensusAPI creates a new consensus api for the given backend.
Expand All @@ -65,11 +100,16 @@ func NewConsensusAPI(eth *eth.Ethereum) *ConsensusAPI {
if eth.BlockChain().Config().TerminalTotalDifficulty == nil {
panic("Catalyst started without valid total difficulty")
}
return &ConsensusAPI{
eth: eth,
remoteBlocks: newHeaderQueue(),
localBlocks: newPayloadQueue(),
api := &ConsensusAPI{
eth: eth,
remoteBlocks: newHeaderQueue(),
localBlocks: newPayloadQueue(),
invalidBlocksHits: make(map[common.Hash]int),
invalidTipsets: make(map[common.Hash]*types.Header),
}
eth.Downloader().SetBadBlockCallback(api.setInvalidAncestor)

return api
}

// ForkchoiceUpdatedV1 has several responsibilities:
Expand Down Expand Up @@ -97,6 +137,10 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update beacon.ForkchoiceStateV1, pa
// reason.
block := api.eth.BlockChain().GetBlockByHash(update.HeadBlockHash)
if block == nil {
// If this block was previously invalidated, keep rejecting it here too
if res := api.checkInvalidAncestor(update.HeadBlockHash, update.HeadBlockHash); res != nil {
return beacon.ForkChoiceResponse{PayloadStatus: *res, PayloadID: nil}, nil
}
// If the head hash is unknown (was not given to us in a newPayload request),
// we cannot resolve the header, so not much to do. This could be extended in
// the future to resolve from the `eth` network, but it's an unexpected case
Expand Down Expand Up @@ -267,6 +311,10 @@ func (api *ConsensusAPI) NewPayloadV1(params beacon.ExecutableDataV1) (beacon.Pa
hash := block.Hash()
return beacon.PayloadStatusV1{Status: beacon.VALID, LatestValidHash: &hash}, nil
}
// If this block was rejected previously, keep rejecting it
if res := api.checkInvalidAncestor(block.Hash(), block.Hash()); res != nil {
return *res, nil
}
// If the parent is missing, we - in theory - could trigger a sync, but that
// would also entail a reorg. That is problematic if multiple sibling blocks
// are being fed to us, and even more so, if some semi-distant uncle shortens
Expand All @@ -275,6 +323,11 @@ func (api *ConsensusAPI) NewPayloadV1(params beacon.ExecutableDataV1) (beacon.Pa
// update after legit payload executions.
parent := api.eth.BlockChain().GetBlock(block.ParentHash(), block.NumberU64()-1)
if parent == nil {
// Sanity check that this block's parent is not on a previously invalidated
// chain. If it is, mark the block as invalid too.
if res := api.checkInvalidAncestor(block.ParentHash(), block.Hash()); res != nil {
return *res, nil
}
// Stash the block away for a potential forced forckchoice update to it
// at a later time.
api.remoteBlocks.put(block.Hash(), block.Header())
Expand Down Expand Up @@ -310,7 +363,7 @@ func (api *ConsensusAPI) NewPayloadV1(params beacon.ExecutableDataV1) (beacon.Pa
}
if block.Time() <= parent.Time() {
log.Warn("Invalid timestamp", "parent", block.Time(), "block", block.Time())
return api.invalid(errors.New("invalid timestamp"), parent), nil
return api.invalid(errors.New("invalid timestamp"), parent.Header()), nil
}
if !api.eth.BlockChain().HasBlockAndState(block.ParentHash(), block.NumberU64()-1) {
api.remoteBlocks.put(block.Hash(), block.Header())
Expand All @@ -320,7 +373,13 @@ func (api *ConsensusAPI) NewPayloadV1(params beacon.ExecutableDataV1) (beacon.Pa
log.Trace("Inserting block without sethead", "hash", block.Hash(), "number", block.Number)
if err := api.eth.BlockChain().InsertBlockWithoutSetHead(block); err != nil {
log.Warn("NewPayloadV1: inserting block failed", "error", err)
return api.invalid(err, parent), nil

api.invalidLock.Lock()
api.invalidBlocksHits[block.Hash()] = 1
api.invalidTipsets[block.Hash()] = block.Header()
api.invalidLock.Unlock()

return api.invalid(err, parent.Header()), nil
}
// We've accepted a valid payload from the beacon client. Mark the local
// chain transitions to notify other subsystems (e.g. downloader) of the
Expand All @@ -346,14 +405,63 @@ func computePayloadId(headBlockHash common.Hash, params *beacon.PayloadAttribute
return out
}

// setInvalidAncestor is a callback for the downloader to notify us if a bad block
// is encountered during the async sync.
func (api *ConsensusAPI) setInvalidAncestor(invalid *types.Header, origin *types.Header) {
api.invalidLock.Lock()
defer api.invalidLock.Unlock()

api.invalidTipsets[origin.Hash()] = invalid
}

// checkInvalidAncestor checks whether the specified chain end links to a known
// bad ancestor. If yes, it constructs the payload failure response to return.
func (api *ConsensusAPI) checkInvalidAncestor(check common.Hash, head common.Hash) *beacon.PayloadStatusV1 {
api.invalidLock.Lock()
defer api.invalidLock.Unlock()

// If the hash to check is unknown, return valid
invalid, ok := api.invalidTipsets[check]
if !ok {
return nil
}
// If the bad hash was hit too many times, evict it and try to reprocess in
// the hopes that we have a data race that we can exit out of.
badHash := invalid.Hash()

api.invalidBlocksHits[badHash]++
if api.invalidBlocksHits[badHash] >= invalidBlockHitEviction {
log.Warn("Too many bad block import attempt, trying", "number", invalid.Number, "hash", badHash)
delete(api.invalidBlocksHits, badHash)
return nil
}
// Not too many failures yet, mark the head of the invalid chain as invalid
if check != head {
log.Warn("Marked new chain head as invalid", "hash", head, "badnumber", invalid.Number, "badhash", badHash)
api.invalidTipsets[head] = invalid
if len(api.invalidTipsets) > invalidTipsetsCap {
for key := range api.invalidTipsets {
delete(api.invalidTipsets, key)
break
}
}
}
failure := "links to previously rejected block"
return &beacon.PayloadStatusV1{
Status: beacon.INVALID,
LatestValidHash: &invalid.ParentHash,
ValidationError: &failure,
}
}

// invalid returns a response "INVALID" with the latest valid hash supplied by latest or to the current head
// if no latestValid block was provided.
func (api *ConsensusAPI) invalid(err error, latestValid *types.Block) beacon.PayloadStatusV1 {
func (api *ConsensusAPI) invalid(err error, latestValid *types.Header) beacon.PayloadStatusV1 {
currentHash := api.eth.BlockChain().CurrentBlock().Hash()
if latestValid != nil {
// Set latest valid hash to 0x0 if parent is PoW block
currentHash = common.Hash{}
if latestValid.Difficulty().BitLen() == 0 {
if latestValid.Difficulty.BitLen() == 0 {
// Otherwise set latest valid hash to parent hash
currentHash = latestValid.Hash()
}
Expand Down
7 changes: 7 additions & 0 deletions eth/downloader/beaconsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ func (b *beaconBackfiller) setMode(mode SyncMode) {
b.resume()
}

// SetBadBlockCallback sets the callback to run when a bad block is hit by the
// block processor. This method is not thread safe and should be set only once
// on startup before system events are fired.
func (d *Downloader) SetBadBlockCallback(onBadBlock badBlockFn) {
d.badBlock = onBadBlock
}

// BeaconSync is the post-merge version of the chain synchronization, where the
// chain is not downloaded from genesis onward, rather from trusted head announces
// backwards.
Expand Down
17 changes: 16 additions & 1 deletion eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ var (
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)

// badBlockFn is a callback for the async beacon sync to notify the caller that
// the origin header requested to sync to, produced a chain with a bad block.
type badBlockFn func(invalid *types.Header, origin *types.Header)

// headerTask is a set of downloaded headers to queue along with their precomputed
// hashes to avoid constant rehashing.
type headerTask struct {
Expand Down Expand Up @@ -113,6 +117,7 @@ type Downloader struct {

// Callbacks
dropPeer peerDropFn // Drops a peer for misbehaving
badBlock badBlockFn // Reports a block as rejected by the chain

// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
Expand Down Expand Up @@ -1528,7 +1533,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
return errCancelContentProcessing
default:
}
// Retrieve the a batch of results to import
// Retrieve a batch of results to import
first, last := results[0].Header, results[len(results)-1].Header
log.Debug("Inserting downloaded chain", "items", len(results),
"firstnum", first.Number, "firsthash", first.Hash(),
Expand All @@ -1544,6 +1549,16 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
if index, err := d.blockchain.InsertChain(blocks); err != nil {
if index < len(results) {
log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)

// In post-merge, notify the engine API of encountered bad chains
if d.badBlock != nil {
head, _, err := d.skeleton.Bounds()
if err != nil {
log.Error("Failed to retrieve beacon bounds for bad block reporting", "err", err)
} else {
d.badBlock(blocks[index].Header(), head)
}
}
} else {
// The InsertChain method in blockchain.go will sometimes return an out-of-bounds index,
// when it needs to preprocess blocks to import a sidechain.
Expand Down

0 comments on commit 355f049

Please sign in to comment.