Skip to content

Commit

Permalink
blockchain/v1: fix deadlock (#5711)
Browse files Browse the repository at this point in the history
I introduced a new variable - syncEnded, which is now used to prevent
sending new events to channels (which would block otherwise) if reactor
is finished syncing

Closes #4591
  • Loading branch information
melekes committed Dec 1, 2020
1 parent 3ae4b5d commit f06feb1
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
### BUG FIXES:

- [blockchain/v1] [\#5701](https://github.com/tendermint/tendermint/pull/5701) Handle peers without blocks (@melekes)
- [blockchain/v1] \#5711 Fix deadlock (@melekes)
63 changes: 43 additions & 20 deletions blockchain/v1/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"reflect"
"sync/atomic"
"time"

amino "github.com/tendermint/go-amino"
Expand Down Expand Up @@ -74,6 +75,9 @@ type BlockchainReactor struct {
eventsFromFSMCh chan bcFsmMessage

swReporter *behaviour.SwitchReporter

// Atomic integer (0 - sync in progress, 1 - finished syncing)
syncEnded int32
}

// NewBlockchainReactor returns new reactor instance.
Expand Down Expand Up @@ -145,13 +149,22 @@ func (bcR *BlockchainReactor) OnStart() error {
bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch)
if bcR.fastSync {
go bcR.poolRoutine()
} else { // if we're not fast syncing, mark it as finished
bcR.setSyncEnded()
}
return nil
}

// OnStop implements service.Service.
func (bcR *BlockchainReactor) OnStop() {
_ = bcR.Stop()
}

func (bcR *BlockchainReactor) isSyncEnded() bool {
return atomic.LoadInt32(&(bcR.syncEnded)) != 0
}

func (bcR *BlockchainReactor) setSyncEnded() {
atomic.StoreInt32(&(bcR.syncEnded), 1)
}

// GetChannels implements Reactor
Expand Down Expand Up @@ -208,6 +221,10 @@ func (bcR *BlockchainReactor) sendStatusResponseToPeer(msg *bcStatusRequestMessa

// RemovePeer implements Reactor by removing peer from the pool.
func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
if bcR.isSyncEnded() {
return
}

msgData := bcReactorMessage{
event: peerRemoveEv,
data: bReactorEventData{
Expand Down Expand Up @@ -251,6 +268,10 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
}

case *bcBlockResponseMessage:
if bcR.isSyncEnded() {
return
}

msgForFSM := bcReactorMessage{
event: blockResponseEv,
data: bReactorEventData{
Expand All @@ -264,6 +285,10 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
bcR.messagesForFSMCh <- msgForFSM

case *bcNoBlockResponseMessage:
if bcR.isSyncEnded() {
return
}

msgForFSM := bcReactorMessage{
event: noBlockResponseEv,
data: bReactorEventData{
Expand All @@ -275,6 +300,9 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
bcR.messagesForFSMCh <- msgForFSM

case *bcStatusResponseMessage:
if bcR.isSyncEnded() {
return
}
// Got a peer status. Unverified.
msgForFSM := bcReactorMessage{
event: statusResponseEv,
Expand All @@ -285,24 +313,27 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
},
}
bcR.messagesForFSMCh <- msgForFSM

default:
bcR.Logger.Error(fmt.Sprintf("unknown message type %v", reflect.TypeOf(msg)))
}
}

// processBlocksRoutine processes blocks until signlaed to stop over the stopProcessing channel
func (bcR *BlockchainReactor) processBlocksRoutine(stopProcessing chan struct{}) {

processReceivedBlockTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
doProcessBlockCh := make(chan struct{}, 1)
defer processReceivedBlockTicker.Stop()

lastHundred := time.Now()
lastRate := 0.0
var (
doProcessBlockCh = make(chan struct{}, 1)
lastHundred = time.Now()
lastRate = 0.0
)

ForLoop:
for {
select {
case <-bcR.Quit():
break ForLoop
case <-stopProcessing:
bcR.Logger.Info("finishing block execution")
break ForLoop
Expand Down Expand Up @@ -345,12 +376,14 @@ ForLoop:

// poolRoutine receives and handles messages from the Receive() routine and from the FSM.
func (bcR *BlockchainReactor) poolRoutine() {

bcR.fsm.Start()

sendBlockRequestTicker := time.NewTicker(trySendIntervalMS * time.Millisecond)
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)

defer sendBlockRequestTicker.Stop()
// NOTE: statusUpdateTicker can continue to run

stopProcessing := make(chan struct{}, 1)
go bcR.processBlocksRoutine(stopProcessing)

Expand Down Expand Up @@ -383,12 +416,10 @@ ForLoop:

case msg := <-bcR.eventsFromFSMCh:
switch msg.event {
case syncFinishedEv:
case syncFinishedEv: // Sent from the FSM when it enters finished state.
stopProcessing <- struct{}{}
// Sent from the FSM when it enters finished state.
break ForLoop
case peerErrorEv:
// Sent from the FSM when it detects peer error
bcR.setSyncEnded()
case peerErrorEv: // Sent from the FSM when it detects peer error
bcR.reportPeerErrorToSwitch(msg.data.err, msg.data.peerID)
if msg.data.err == errNoPeerResponse {
// Sent from the peer timeout handler routine
Expand Down Expand Up @@ -455,7 +486,6 @@ func (bcR *BlockchainReactor) processBlock() error {
return nil
}

// Implements bcRNotifier
// sendStatusRequest broadcasts `BlockStore` height.
func (bcR *BlockchainReactor) sendStatusRequest() {
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusRequestMessage{
Expand All @@ -465,7 +495,6 @@ func (bcR *BlockchainReactor) sendStatusRequest() {
bcR.Switch.Broadcast(BlockchainChannel, msgBytes)
}

// Implements bcRNotifier
// BlockRequest sends `BlockRequest` height.
func (bcR *BlockchainReactor) sendBlockRequest(peerID p2p.ID, height int64) error {
peer := bcR.Switch.Peers().Get(peerID)
Expand All @@ -481,19 +510,14 @@ func (bcR *BlockchainReactor) sendBlockRequest(peerID p2p.ID, height int64) erro
return nil
}

// Implements bcRNotifier
func (bcR *BlockchainReactor) switchToConsensus() {
conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
if ok {
conR.SwitchToConsensus(bcR.state, bcR.blocksSynced)
bcR.eventsFromFSMCh <- bcFsmMessage{event: syncFinishedEv}
}
// else {
// Should only happen during testing.
// }
}

// Implements bcRNotifier
// Called by FSM and pool:
// - pool calls when it detects slow peer or when peer times out
// - FSM calls when:
Expand All @@ -511,7 +535,6 @@ func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) {
bcR.eventsFromFSMCh <- msgData
}

// Implements bcRNotifier
func (bcR *BlockchainReactor) resetStateTimer(name string, timer **time.Timer, timeout time.Duration) {
if timer == nil {
panic("nil timer pointer parameter")
Expand Down
8 changes: 3 additions & 5 deletions blockchain/v1/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,9 @@ func TestFastSyncNoBlockResponse(t *testing.T) {
}
}

// NOTE: This is too hard to test without
// an easy way to add test peer to switch
// or without significant refactoring of the module.
// Alternatively we could actually dial a TCP conn but
// that seems extreme.
// NOTE: This is too hard to test without an easy way to add test peer to
// switch or without significant refactoring of the module. Alternatively we
// could actually dial a TCP conn but that seems extreme.
func TestFastSyncBadBlockStopsPeer(t *testing.T) {
numNodes := 4
maxBlockHeight := int64(148)
Expand Down

0 comments on commit f06feb1

Please sign in to comment.