Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chain repair fixes #1163

Merged
merged 16 commits into from
Oct 15, 2020
24 changes: 24 additions & 0 deletions consensus/istanbul/backend/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,27 @@ func (sb *Backend) verifyHeader(chain consensus.ChainReader, header *types.Heade
return sb.verifyCascadingFields(chain, header, parents)
}

// A sanity check for lightest mode. Checks that the correct epoch block exists for this header
func (sb *Backend) checkEpochBlockExists(chain consensus.ChainReader, header *types.Header, parents []*types.Header) error {
number := header.Number.Uint64()
// Check that latest epoch block is available
epoch := istanbul.GetEpochNumber(number, sb.config.Epoch)
epochBlockNumber := istanbul.GetEpochLastBlockNumber(epoch-1, sb.config.Epoch)
if number == epochBlockNumber {
epochBlockNumber = istanbul.GetEpochLastBlockNumber(epoch-2, sb.config.Epoch)
}
for _, hdr := range parents {
if hdr.Number.Uint64() == epochBlockNumber {
return nil
}
}
parent := chain.GetHeaderByNumber(epochBlockNumber)
if parent == nil || parent.Number.Uint64() != epochBlockNumber {
return consensus.ErrUnknownAncestor
}
return nil
}

// verifyCascadingFields verifies all the header fields that are not standalone,
// rather depend on a batch of previous headers. The caller may optionally pass
// in a batch of parents (ascending order) to avoid looking those up from the
Expand Down Expand Up @@ -163,6 +184,8 @@ func (sb *Backend) verifyCascadingFields(chain consensus.ChainReader, header *ty
if err := sb.verifySigner(chain, header, parents); err != nil {
return err
}
} else if err := sb.checkEpochBlockExists(chain, header, parents); err != nil {
return err
}

return sb.verifyAggregatedSeals(chain, header, parents)
Expand Down Expand Up @@ -764,6 +787,7 @@ func (sb *Backend) snapshot(chain consensus.ChainReader, number uint64, hash com
if s, err := loadSnapshot(sb.config.Epoch, sb.db, blockHash); err == nil {
log.Trace("Loaded validator set snapshot from disk", "number", numberIter, "hash", blockHash)
snap = s
sb.recentSnapshots.Add(numberIter, snap)
break
}
}
Expand Down
210 changes: 87 additions & 123 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ type CacheConfig struct {
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
}

// defaultCacheConfig are the default caching values if none are specified by the
// user (also used during testing).
var defaultCacheConfig = &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
}

// BlockChain represents the canonical chain given a database with a genesis
// block. The Blockchain manages chain imports, reverts, chain reorganisations.
//
Expand Down Expand Up @@ -186,11 +194,7 @@ type BlockChain struct {
// Processor.
func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool) (*BlockChain, error) {
if cacheConfig == nil {
cacheConfig = &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
}
cacheConfig = defaultCacheConfig
}
bodyCache, _ := lru.New(bodyCacheLimit)
bodyRLPCache, _ := lru.New(bodyCacheLimit)
Expand Down Expand Up @@ -240,14 +244,18 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
if bc.empty() {
rawdb.InitDatabaseFromFreezer(bc.db)
}

if err := bc.loadLastState(); err != nil {
return nil, err
}
// The first thing the node will do is reconstruct the verification data for
// the head block. Might as well do it in advance.
bc.engine.VerifyHeader(bc, bc.CurrentHeader(), true)

// Make sure the state associated with the block is available
head := bc.CurrentBlock()
if _, err := state.New(head.Root(), bc.stateCache); err != nil {
log.Warn("Head state missing, repairing", "number", head.Number(), "hash", head.Hash())
if err := bc.SetHead(head.NumberU64()); err != nil {
return nil, err
}
}
// Ensure that a previous crash in SetHead doesn't leave extra ancients
if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 {
var (
needRewind bool
Expand All @@ -257,7 +265,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
// blockchain repair. If the head full block is even lower than the ancient
// chain, truncate the ancient store.
fullBlock := bc.CurrentBlock()
if fullBlock != nil && fullBlock != bc.genesisBlock && fullBlock.NumberU64() < frozen-1 {
if fullBlock != nil && fullBlock.Hash() != bc.genesisBlock.Hash() && fullBlock.NumberU64() < frozen-1 {
needRewind = true
low = fullBlock.NumberU64()
}
Expand All @@ -272,15 +280,17 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
}
}
if needRewind {
var hashes []common.Hash
previous := bc.CurrentHeader().Number.Uint64()
for i := low + 1; i <= bc.CurrentHeader().Number.Uint64(); i++ {
hashes = append(hashes, rawdb.ReadCanonicalHash(bc.db, i))
log.Error("Truncating ancient chain", "from", bc.CurrentHeader().Number.Uint64(), "to", low)
if err := bc.SetHead(low); err != nil {
return nil, err
}
bc.Rollback(hashes, true)
log.Warn("Truncate ancient chain", "from", previous, "to", low)
}
}
// The first thing the node will do is reconstruct the verification data for
// the head block (ethash cache or clique voting snapshot). Might as well do
// it in advance.
bc.engine.VerifyHeader(bc, bc.CurrentHeader(), true)

// Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain
for hash := range BadHashes {
if header := bc.GetHeaderByHash(hash); header != nil {
Expand All @@ -289,7 +299,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
// make sure the headerByNumber (if present) is in our current canonical chain
if headerByNumber != nil && headerByNumber.Hash() == header.Hash() {
log.Error("Found bad hash, rewinding chain", "number", header.Number, "hash", header.ParentHash)
bc.SetHead(header.Number.Uint64() - 1)
if err := bc.SetHead(header.Number.Uint64() - 1); err != nil {
return nil, err
}
log.Error("Chain rewind was successful, resuming normal operation")
}
}
Expand Down Expand Up @@ -344,15 +356,6 @@ func (bc *BlockChain) loadLastState() error {
log.Warn("Head block missing, resetting chain", "hash", head)
return bc.Reset()
}
// Make sure the state associated with the block is available
if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil {
// Dangling block without a state associated, init from scratch
log.Warn("Head state missing, repairing chain", "number", currentBlock.Number(), "hash", currentBlock.Hash())
if err := bc.repair(&currentBlock); err != nil {
return err
}
rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash())
}
// Everything seems to be fine, set as the head block
bc.currentBlock.Store(currentBlock)
headBlockGauge.Update(int64(currentBlock.NumberU64()))
Expand Down Expand Up @@ -387,30 +390,48 @@ func (bc *BlockChain) loadLastState() error {
log.Info("Loaded most recent local header", "number", currentHeader.Number, "hash", currentHeader.Hash(), "td", headerTd, "age", common.PrettyAge(time.Unix(int64(currentHeader.Time), 0)))
log.Info("Loaded most recent local full block", "number", currentBlock.Number(), "hash", currentBlock.Hash(), "td", blockTd, "age", common.PrettyAge(time.Unix(int64(currentBlock.Time()), 0)))
log.Info("Loaded most recent local fast block", "number", currentFastBlock.Number(), "hash", currentFastBlock.Hash(), "td", fastTd, "age", common.PrettyAge(time.Unix(int64(currentFastBlock.Time()), 0)))

if pivot := rawdb.ReadLastPivotNumber(bc.db); pivot != nil {
log.Info("Loaded last fast-sync pivot marker", "number", *pivot)
}
return nil
}

// SetHead rewinds the local chain to a new head. In the case of headers, everything
// above the new head will be deleted and the new one set. In the case of blocks
// though, the head may be further rewound if block bodies are missing (non-archive
// nodes after a fast sync).
// SetHead rewinds the local chain to a new head. Depending on whether the node
// was fast synced or full synced and in which state, the method will try to
// delete minimal data from disk whilst retaining chain consistency.
func (bc *BlockChain) SetHead(head uint64) error {
log.Warn("Rewinding blockchain", "target", head)

bc.chainmu.Lock()
defer bc.chainmu.Unlock()

updateFn := func(db ethdb.KeyValueWriter, header *types.Header) {
// Rewind the block chain, ensuring we don't end up with a stateless head block
if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() < currentBlock.NumberU64() {
// Retrieve the last pivot block to short circuit rollbacks beyond it and the
// current freezer limit to start nuking id underflown
pivot := rawdb.ReadLastPivotNumber(bc.db)
frozen, _ := bc.db.Ancients()

updateFn := func(db ethdb.KeyValueWriter, header *types.Header) (uint64, bool) {
// Rewind the block chain, ensuring we don't end up with a stateless head
// block. Note, depth equality is permitted to allow using SetHead as a
// chain reparation mechanism without deleting any data!
if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() <= currentBlock.NumberU64() {
newHeadBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
if newHeadBlock == nil {
log.Error("Gap in the chain, rewinding to genesis", "number", header.Number, "hash", header.Hash())
newHeadBlock = bc.genesisBlock
} else {
if _, err := state.New(newHeadBlock.Root(), bc.stateCache); err != nil {
// Rewound state missing, rolled back to before pivot, reset to genesis
newHeadBlock = bc.genesisBlock
// Block exists, keep rewinding until we find one with state
for {
if _, err := state.New(newHeadBlock.Root(), bc.stateCache); err != nil {
log.Info("Block state missing, rewinding further", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
if pivot == nil || newHeadBlock.NumberU64() > *pivot {
newHeadBlock = bc.GetBlock(newHeadBlock.ParentHash(), newHeadBlock.NumberU64()-1)
continue
} else {
log.Trace("Rewind passed pivot, aiming genesis", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "pivot", *pivot)
newHeadBlock = bc.genesisBlock
}
}
log.Info("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
break
}
}
rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash())
Expand All @@ -422,7 +443,6 @@ func (bc *BlockChain) SetHead(head uint64) error {
bc.currentBlock.Store(newHeadBlock)
headBlockGauge.Update(int64(newHeadBlock.NumberU64()))
}

// Rewind the fast block in a simpleton way to the target head
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && header.Number.Uint64() < currentFastBlock.NumberU64() {
newHeadFastBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
Expand All @@ -438,20 +458,29 @@ func (bc *BlockChain) SetHead(head uint64) error {
// to low, so it's safe the update in-memory markers directly.
bc.currentFastBlock.Store(newHeadFastBlock)
headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64()))
log.Info("Rewound fast block", "number", newHeadFastBlock.NumberU64())
}
}
head := bc.CurrentBlock().NumberU64()

// If setHead underflown the freezer threshold and the block processing
// intent afterwards is full block importing, delete the chain segment
// between the stateful-block and the sethead target.
var wipe bool
if head+1 < frozen {
wipe = pivot == nil || head >= *pivot
}
return head, wipe // Only force wipe if full synced
}
// Rewind the header chain, deleting all block bodies until then
delFn := func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) {
// Ignore the error here since light client won't hit this path
frozen, _ := bc.db.Ancients()
if num+1 <= frozen {
// Truncate all relative data(header, total difficulty, body, receipt
// and canonical hash) from ancient store.
if err := bc.db.TruncateAncients(num + 1); err != nil {
if err := bc.db.TruncateAncients(num); err != nil {
log.Crit("Failed to truncate ancient data", "number", num, "err", err)
}

// Remove the hash <-> number mapping from the active store.
rawdb.DeleteHeaderNumber(db, hash)
} else {
Expand All @@ -463,8 +492,18 @@ func (bc *BlockChain) SetHead(head uint64) error {
}
// Todo(rjl493456442) txlookup, bloombits, etc
}
bc.hc.SetHead(head, updateFn, delFn)

// If SetHead was only called as a chain reparation method, try to skip
// touching the header chain altogether, unless the freezer is broken
if block := bc.CurrentBlock(); block.NumberU64() == head {
if target, force := updateFn(bc.db, block.Header()); force {
bc.hc.SetHead(target, updateFn, delFn)
}
} else {
// Rewind the chain to the requested head and keep going backwards until a
// block with a state is found or fast sync pivot is passed
log.Warn("Rewinding blockchain", "target", head)
bc.hc.SetHead(head, updateFn, delFn)
}
// Clear out any stale content from the caches
bc.bodyCache.Purge()
bc.bodyRLPCache.Purge()
Expand Down Expand Up @@ -569,36 +608,6 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
return nil
}

// repair tries to repair the current blockchain by rolling back the current block
// until one with associated state is found. This is needed to fix incomplete db
// writes caused either by crashes/power outages, or simply non-committed tries.
//
// This method only rolls back the current block. The current header and current
// fast block are left intact.
func (bc *BlockChain) repair(head **types.Block) error {
batch := bc.db.NewBatch()

for {
// Abort if we've rewound to a head block that does have associated state
if _, err := state.New((*head).Root(), bc.stateCache); err == nil {
log.Info("Rewound blockchain to past state", "number", (*head).Number(), "hash", (*head).Hash())
if err := batch.Write(); err != nil {
log.Error("Error when removing sidechain", "err", err)
}
bc.purge()
return nil
}
// Otherwise rewind one block and recheck state availability there
block := bc.GetBlock((*head).ParentHash(), (*head).NumberU64()-1)
if block == nil {
return fmt.Errorf("missing block %d [%x]", (*head).NumberU64()-1, (*head).ParentHash())
}
// It's safest to remove these blocks
rawdb.DeleteBlock(batch, (*head).Hash(), (*head).NumberU64())
*head = block
}
}

// Export writes the active chain to the given writer.
func (bc *BlockChain) Export(w io.Writer) error {
return bc.ExportN(w, uint64(0), bc.CurrentBlock().NumberU64())
Expand Down Expand Up @@ -893,52 +902,6 @@ const (
SideStatTy
)

// Rollback is designed to remove a chain of links from the database that aren't
// certain enough to be valid.
func (bc *BlockChain) Rollback(chain []common.Hash, fullHeaderChainAvailable bool) {
bc.chainmu.Lock()
defer bc.chainmu.Unlock()

batch := bc.db.NewBatch()
for i := len(chain) - 1; i >= 0; i-- {
hash := chain[i]

// Degrade the chain markers if they are explicitly reverted.
// In theory we should update all in-memory markers in the
// last step, however the direction of rollback is from high
// to low, so it's safe the update in-memory markers directly.
currentHeader := bc.hc.CurrentHeader()
if currentHeader.Hash() == hash {
newHeadHeader := bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1)
rawdb.WriteHeadHeaderHash(batch, currentHeader.ParentHash)
bc.hc.SetCurrentHeader(newHeadHeader)
}
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock.Hash() == hash {
newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1)
rawdb.WriteHeadFastBlockHash(batch, currentFastBlock.ParentHash())
bc.currentFastBlock.Store(newFastBlock)
headFastBlockGauge.Update(int64(newFastBlock.NumberU64()))
}
if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash {
newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1)
rawdb.WriteHeadBlockHash(batch, currentBlock.ParentHash())
bc.currentBlock.Store(newBlock)
headBlockGauge.Update(int64(newBlock.NumberU64()))
}
}
if err := batch.Write(); err != nil {
log.Crit("Failed to rollback chain markers", "err", err)
}
// Truncate ancient data which exceeds the current header.
//
// Notably, it can happen that system crashes without truncating the ancient data
// but the head indicator has been updated in the active store. Regarding this issue,
// system will self recovery by truncating the extra data during the setup phase.
if err := bc.truncateAncient(bc.hc.CurrentHeader().Number.Uint64()); err != nil {
log.Crit("Truncate ancient store failed", "err", err)
}
}

// truncateAncient rewinds the blockchain to the specified header and deletes all
// data in the ancient store that exceeds the specified header.
func (bc *BlockChain) truncateAncient(head uint64) error {
Expand Down Expand Up @@ -2235,7 +2198,8 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int, co
_, err := bc.hc.WriteHeader(header)
return err
}
return bc.hc.InsertHeaderChain(chain, whFunc, start)
res, err := bc.hc.InsertHeaderChain(chain, whFunc, start)
return res, err
}

// CurrentHeader retrieves the current head header of the canonical chain. The
Expand Down
Loading