Skip to content

Commit

Permalink
fix to return leveldb access failure as an error instead of panic
Browse files Browse the repository at this point in the history
  • Loading branch information
torao committed Sep 26, 2022
1 parent 1c1c7fb commit b679d60
Show file tree
Hide file tree
Showing 18 changed files with 259 additions and 104 deletions.
5 changes: 4 additions & 1 deletion blockchain/v0/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,10 @@ func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
func (bcR *BlockchainReactor) respondToPeer(msg *bcproto.BlockRequest,
src p2p.Peer) (queued bool) {

block := bcR.store.LoadBlock(msg.Height)
block, err := bcR.store.LoadBlock(msg.Height)
if err != nil {
panic(err)
}
if block != nil {
bl, err := block.ToProto()
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion blockchain/v1/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,10 @@ func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) {
func (bcR *BlockchainReactor) sendBlockToPeer(msg *bcproto.BlockRequest,
src p2p.Peer) (queued bool) {

block := bcR.store.LoadBlock(msg.Height)
block, err := bcR.store.LoadBlock(msg.Height)
if err != nil {
panic(err)
}
if block != nil {
pbbi, err := block.ToProto()
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions blockchain/v2/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
)

type blockStore interface {
LoadBlock(height int64) *types.Block
LoadBlock(height int64) (*types.Block, error)
SaveBlock(*types.Block, *types.PartSet, *types.Commit)
Base() int64
Height() int64
Expand Down Expand Up @@ -480,7 +480,10 @@ func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
}

case *bcproto.BlockRequest:
block := r.store.LoadBlock(msg.Height)
block, err := r.store.LoadBlock(msg.Height)
if err != nil {

}
if block != nil {
if err = r.io.sendBlockToPeer(block, src.ID()); err != nil {
r.logger.Error("Could not send block message to peer: ", err)
Expand Down
58 changes: 53 additions & 5 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,16 @@ OUTER_LOOP:

// if we never received the commit message from the peer, the block parts wont be initialized
if prs.ProposalBlockParts == nil {
blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height)
blockMeta, err := conR.conS.blockStore.LoadBlockMeta(prs.Height)
if err != nil {
// BlockDB may be closed If the node is stopped during operation. For this reason, if the acquisition
// of seen commit fails and the reactor is stopped, we don't consider an error and terminate the
// operation.
if peer.IsRunning() && conR.IsRunning() {
logger.Error(fmt.Sprintf("Failed to retrieve block meta: %s", err), "height", prs.Height)
}
continue OUTER_LOOP
}
if blockMeta == nil {
heightLogger.Error("Failed to load block meta",
"blockstoreBase", blockStoreBase, "blockstoreHeight", conR.conS.blockStore.Height())
Expand Down Expand Up @@ -618,7 +627,16 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt

if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
// Ensure that the peer's PartSetHeader is correct
blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height)
blockMeta, err := conR.conS.blockStore.LoadBlockMeta(prs.Height)
if err != nil {
// BlockDB may be closed If the node is stopped during operation. For this reason, if the acquisition
// of seen commit fails and the reactor is stopped, we don't consider an error and terminate the
// operation.
if peer.IsRunning() && conR.IsRunning() {
logger.Error(fmt.Sprintf("Failed to retrieve block meta: %s", err), "height", prs.Height)
}
return
}
if blockMeta == nil {
logger.Error("Failed to load block meta", "ourHeight", rs.Height,
"blockstoreBase", conR.conS.blockStore.Base(), "blockstoreHeight", conR.conS.blockStore.Height())
Expand All @@ -631,7 +649,16 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt
return
}
// Load the part
part := conR.conS.blockStore.LoadBlockPart(prs.Height, index)
part, err := conR.conS.blockStore.LoadBlockPart(prs.Height, index)
if err != nil {
// BlockDB may be closed If the node is stopped during operation. For this reason, if the acquisition
// of seen commit fails and the reactor is stopped, we don't consider an error and terminate the
// operation.
if peer.IsRunning() && conR.IsRunning() {
logger.Error(fmt.Sprintf("Failed to retrieve block part: %s", err), "height", prs.Height)
}
return
}
if part == nil {
logger.Error("Could not load part", "index", index,
"blockPartSetHeader", blockMeta.BlockID.PartSetHeader, "peerBlockPartSetHeader", prs.ProposalBlockPartSetHeader)
Expand Down Expand Up @@ -706,7 +733,17 @@ OUTER_LOOP:
// which contains precommit signatures for prs.Height.
// Originally the block commit was used, but with the addition of the BLS signature-aggregation,
// we use seen commit instead of the block commit because block commit has no individual signature.
if commit := conR.conS.blockStore.LoadSeenCommit(prs.Height); commit != nil {
commit, err := conR.conS.blockStore.LoadSeenCommit(prs.Height)
if err != nil {
// BlockDB may be closed If the node is stopped during operation. For this reason, if the acquisition
// of seen commit fails and the reactor is stopped, we don't consider an error and terminate the
// operation.
if peer.IsRunning() && conR.IsRunning() {
logger.Error(fmt.Sprintf("Failed to retrieve seen commit: %s", err), "height", prs.Height)
}
continue OUTER_LOOP
}
if commit != nil {
if ps.PickSendVote(commit) {
logger.Debug("Picked Catchup commit to send", "height", prs.Height)
continue OUTER_LOOP
Expand Down Expand Up @@ -792,6 +829,7 @@ func (conR *Reactor) gossipVotesForHeight(
// NOTE: `queryMaj23Routine` has a simple crude design since it only comes
// into play for liveness when there's a signature DDoS attack happening.
func (conR *Reactor) queryMaj23Routine(peer p2p.Peer, ps *PeerState) {
logger := conR.Logger.With("peer", peer)

OUTER_LOOP:
for {
Expand Down Expand Up @@ -859,7 +897,17 @@ OUTER_LOOP:
prs := ps.GetRoundState()
if prs.CatchupCommitRound != -1 && prs.Height > 0 && prs.Height <= conR.conS.blockStore.Height() &&
prs.Height >= conR.conS.blockStore.Base() {
if commit := conR.conS.LoadCommit(prs.Height); commit != nil {
commit, err := conR.conS.LoadCommit(prs.Height)
if err != nil {
// BlockDB may be closed If the node is stopped during operation. For this reason, if the acquisition
// of seen commit fails and the reactor is stopped, we don't consider an error and terminate the
// operation.
if peer.IsRunning() && conR.IsRunning() {
logger.Error(fmt.Sprintf("Failed to retrieve seen commit: %s", err), "height", prs.Height)
}
continue OUTER_LOOP
}
if commit != nil {
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
Height: prs.Height,
Round: commit.Round,
Expand Down
16 changes: 12 additions & 4 deletions consensus/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,10 @@ func (h *Handshaker) replayBlocks(
}
for i := firstBlock; i <= finalBlock; i++ {
h.logger.Info("Applying block", "height", i)
block := h.store.LoadBlock(i)
block, err := h.store.LoadBlock(i)
if err != nil {
return nil, err
}
// Extra check to ensure the app was not changed in a way it shouldn't have.
if len(appHash) > 0 {
assertAppHashEqualsOneFromBlock(appHash, block)
Expand Down Expand Up @@ -500,15 +503,20 @@ func (h *Handshaker) replayBlocks(

// ApplyBlock on the proxyApp with the last block.
func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.AppConnConsensus) (sm.State, error) {
block := h.store.LoadBlock(height)
meta := h.store.LoadBlockMeta(height)
block, err := h.store.LoadBlock(height)
if err != nil {
return sm.State{}, err
}
meta, err := h.store.LoadBlockMeta(height)
if err != nil {
return sm.State{}, err
}

// Use stubs for both mempool and evidence pool since no transactions nor
// evidence are needed here - block already exists.
blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{})
blockExec.SetEventBus(h.eventBus)

var err error
state, _, err = blockExec.ApplyBlock(state, meta.BlockID, block, nil)
if err != nil {
return sm.State{}, err
Expand Down
8 changes: 4 additions & 4 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,11 +1215,11 @@ func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta {
func (bs *mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { return nil }
func (bs *mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
}
func (bs *mockBlockStore) LoadBlockCommit(height int64) *types.Commit {
return bs.commits[height-1]
func (bs *mockBlockStore) LoadBlockCommit(height int64) (*types.Commit, error) {
return bs.commits[height-1], nil
}
func (bs *mockBlockStore) LoadSeenCommit(height int64) *types.Commit {
return bs.commits[height-1]
func (bs *mockBlockStore) LoadSeenCommit(height int64) (*types.Commit, error) {
return bs.commits[height-1], nil
}

func (bs *mockBlockStore) PruneBlocks(height int64) (uint64, error) {
Expand Down
22 changes: 17 additions & 5 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (cs *State) SetTimeoutTicker(timeoutTicker TimeoutTicker) {
}

// LoadCommit loads the commit for a given height.
func (cs *State) LoadCommit(height int64) *types.Commit {
func (cs *State) LoadCommit(height int64) (*types.Commit, error) {
cs.mtx.RLock()
defer cs.mtx.RUnlock()

Expand Down Expand Up @@ -608,7 +608,10 @@ func (cs *State) sendInternalMessage(mi msgInfo) {
// Reconstruct LastCommit from SeenCommit, which we saved along with the block,
// (which happens even before saving the state)
func (cs *State) reconstructLastCommit(state sm.State) {
seenCommit := cs.blockStore.LoadSeenCommit(state.LastBlockHeight)
seenCommit, err := cs.blockStore.LoadSeenCommit(state.LastBlockHeight)
if err != nil {
panic(err)
}
if seenCommit == nil {
panic(fmt.Sprintf(
"failed to reconstruct last commit; seen commit for height %v not found",
Expand Down Expand Up @@ -1091,7 +1094,10 @@ func (cs *State) needProofBlock(height int64) bool {
return true
}

lastBlockMeta := cs.blockStore.LoadBlockMeta(height - 1)
lastBlockMeta, err := cs.blockStore.LoadBlockMeta(height - 1)
if err != nil {
panic(err)
}
if lastBlockMeta == nil {
panic(fmt.Sprintf("needProofBlock: last block meta for height %d not found", height-1))
}
Expand Down Expand Up @@ -1885,7 +1891,10 @@ func (cs *State) recordMetrics(height int64, block *types.Block) {
cs.metrics.ByzantineVotersPower.Set(float64(byzantineVotersPower))

if height > 1 {
lastBlockMeta := cs.blockStore.LoadBlockMeta(height - 1)
lastBlockMeta, err := cs.blockStore.LoadBlockMeta(height - 1)
if err != nil {
panic(err)
}
if lastBlockMeta != nil {
cs.metrics.BlockIntervalSeconds.Set(
block.Time.Sub(lastBlockMeta.Header.Time).Seconds(),
Expand Down Expand Up @@ -2397,7 +2406,10 @@ func (cs *State) checkDoubleSigningRisk(height int64) error {
}

for i := int64(1); i < doubleSignCheckHeight; i++ {
lastCommit := cs.blockStore.LoadSeenCommit(height - i)
lastCommit, err := cs.blockStore.LoadSeenCommit(height - i)
if err != nil {
panic(err)
}
if lastCommit != nil {
for sigIdx, s := range lastCommit.Signatures {
if s.BlockIDFlag == types.BlockIDFlagCommit && bytes.Equal(s.ValidatorAddress, valAddr) {
Expand Down
5 changes: 4 additions & 1 deletion evidence/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,10 @@ func (evpool *Pool) processConsensusBuffer(state sm.State) {
)
continue
}
blockMeta := evpool.blockStore.LoadBlockMeta(voteSet.VoteA.Height)
blockMeta, err := evpool.blockStore.LoadBlockMeta(voteSet.VoteA.Height)
if err != nil {
panic(err)
}
if blockMeta == nil {
evpool.logger.Error("failed to load block time for conflicting votes", "height", voteSet.VoteA.Height)
continue
Expand Down
4 changes: 2 additions & 2 deletions evidence/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
//go:generate mockery --case underscore --name BlockStore

type BlockStore interface {
LoadBlockMeta(height int64) *types.BlockMeta
LoadBlockCommit(height int64) *types.Commit
LoadBlockMeta(height int64) (*types.BlockMeta, error)
LoadBlockCommit(height int64) (*types.Commit, error)
Height() int64
}
15 changes: 12 additions & 3 deletions evidence/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ func (evpool *Pool) verify(evidence types.Evidence) error {
)

// verify the time of the evidence
blockMeta := evpool.blockStore.LoadBlockMeta(evidence.Height())
blockMeta, err := evpool.blockStore.LoadBlockMeta(evidence.Height())
if err != nil {
return err
}
if blockMeta == nil {
return fmt.Errorf("don't have header #%d", evidence.Height())
}
Expand Down Expand Up @@ -282,11 +285,17 @@ func validateABCIEvidence(
}

func getSignedHeader(blockStore BlockStore, height int64) (*types.SignedHeader, error) {
blockMeta := blockStore.LoadBlockMeta(height)
blockMeta, err := blockStore.LoadBlockMeta(height)
if err != nil {
return nil, err
}
if blockMeta == nil {
return nil, fmt.Errorf("don't have header at height #%d", height)
}
commit := blockStore.LoadBlockCommit(height)
commit, err := blockStore.LoadBlockCommit(height)
if err != nil {
return nil, err
}
if commit == nil {
return nil, fmt.Errorf("don't have commit at height #%d", height)
}
Expand Down
Loading

0 comments on commit b679d60

Please sign in to comment.