Skip to content

Commit fb0450e

Browse files
committed
Properly handle ticks when waiting for In-Flight view to finish
Due to a bug, When the view changer initializes an in-flight view it only waits for a single tick before resuming view change standard operation. Another part of the bug is that it anyway returns success, which in turn makes the sequence of the controller be initialized to an old sequence. Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
1 parent 97c6a40 commit fb0450e

File tree

4 files changed

+186
-17
lines changed

4 files changed

+186
-17
lines changed

internal/bft/viewchanger.go

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -233,13 +233,13 @@ func (v *ViewChanger) checkIfResendViewChange(now time.Time) {
233233
}
234234
}
235235

236-
func (v *ViewChanger) checkIfTimeout(now time.Time) {
236+
func (v *ViewChanger) checkIfTimeout(now time.Time) bool {
237237
if !v.checkTimeout {
238-
return
238+
return false
239239
}
240240
nextTimeout := v.startViewChangeTime.Add(v.ViewChangeTimeout * time.Duration(v.backOffFactor))
241241
if nextTimeout.After(now) { // check if timeout has passed
242-
return
242+
return false
243243
}
244244
v.Logger.Debugf("Node %d got a view change timeout, the current view is %d", v.SelfID, v.currView)
245245
v.checkTimeout = false // stop timeout for now, a new one will start when a new view change begins
@@ -248,6 +248,7 @@ func (v *ViewChanger) checkIfTimeout(now time.Time) {
248248
v.Logger.Debugf("Node %d is calling sync because it got a view change timeout", v.SelfID)
249249
v.Synchronizer.Sync()
250250
v.StartViewChange(v.currView, false) // don't stop the view, the sync maybe created a good view
251+
return true
251252
}
252253

253254
func (v *ViewChanger) processMsg(sender uint64, m *protos.Message) {
@@ -1175,13 +1176,16 @@ func (v *ViewChanger) commitInFlightProposal(proposal *protos.Proposal) (success
11751176

11761177
v.Logger.Debugf("Node %d is creating a view for the in flight proposal", v.SelfID)
11771178

1179+
inFlightViewNum := proposalMD.ViewId
1180+
inFlightViewLatestSeq := proposalMD.LatestSequence
1181+
11781182
v.inFlightViewLock.Lock()
1179-
v.inFlightView = &View{
1183+
inFlightView := &View{
11801184
RetrieveCheckpoint: v.Checkpoint.Get,
11811185
DecisionsPerLeader: v.DecisionsPerLeader,
11821186
SelfID: v.SelfID,
11831187
N: v.N,
1184-
Number: proposalMD.ViewId,
1188+
Number: inFlightViewNum,
11851189
LeaderID: v.SelfID, // so that no byzantine leader will cause a complain
11861190
Quorum: v.quorum,
11871191
Decider: v,
@@ -1191,13 +1195,13 @@ func (v *ViewChanger) commitInFlightProposal(proposal *protos.Proposal) (success
11911195
Comm: v.Comm,
11921196
Verifier: v.Verifier,
11931197
Signer: v.Signer,
1194-
ProposalSequence: proposalMD.LatestSequence,
1198+
ProposalSequence: inFlightViewLatestSeq,
11951199
State: v.State,
11961200
InMsgQSize: v.InMsqQSize,
11971201
ViewSequences: v.ViewSequences,
11981202
Phase: PREPARED,
11991203
}
1200-
1204+
v.inFlightView = inFlightView
12011205
v.inFlightView.inFlightProposal = &types.Proposal{
12021206
VerificationSequence: int64(proposal.VerificationSequence),
12031207
Metadata: proposal.Metadata,
@@ -1220,21 +1224,34 @@ func (v *ViewChanger) commitInFlightProposal(proposal *protos.Proposal) (success
12201224
},
12211225
}
12221226

1223-
v.inFlightView.Start()
1227+
inFlightView.Start()
1228+
defer inFlightView.Abort()
1229+
12241230
v.inFlightViewLock.Unlock()
12251231

12261232
v.Logger.Debugf("Node %d started a view for the in flight proposal", v.SelfID)
12271233

1228-
select { // wait for view to finish
1229-
case <-v.inFlightDecideChan:
1230-
case <-v.inFlightSyncChan:
1231-
case <-v.stopChan:
1232-
case now := <-v.Ticker:
1233-
v.lastTick = now
1234-
v.checkIfTimeout(now)
1234+
// wait for view to finish or time out
1235+
for {
1236+
select {
1237+
case <-v.inFlightDecideChan:
1238+
v.Logger.Infof("In-flight view %d with latest sequence %d has committed a decision", inFlightViewNum, inFlightViewLatestSeq)
1239+
return true
1240+
case <-v.inFlightSyncChan:
1241+
v.Logger.Infof("In-flight view %d with latest sequence %d has asked to sync", inFlightViewNum, inFlightViewLatestSeq)
1242+
return false
1243+
case now := <-v.Ticker:
1244+
v.lastTick = now
1245+
if v.checkIfTimeout(now) {
1246+
v.Logger.Infof("Timeout expired waiting on In-flight %d with latest sequence view to commit %d", inFlightViewNum, inFlightViewLatestSeq)
1247+
return false
1248+
}
1249+
case <-v.stopChan:
1250+
v.Logger.Infof("View changer was instructed to stop")
1251+
return false
1252+
}
12351253
}
12361254

1237-
v.inFlightView.Abort()
12381255
return true
12391256
}
12401257

test/basic_test.go

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1657,6 +1657,150 @@ func TestMigrateToBlacklistAndBackAgain(t *testing.T) {
16571657
})
16581658
}
16591659

1660+
func TestNodeInFlightFails(t *testing.T) {
1661+
t.Parallel()
1662+
network := make(Network)
1663+
defer network.Shutdown()
1664+
1665+
testDir, err := ioutil.TempDir("", t.Name())
1666+
assert.NoErrorf(t, err, "generate temporary test dir")
1667+
defer os.RemoveAll(testDir)
1668+
1669+
numberOfNodes := 4
1670+
nodes := make([]*App, 0)
1671+
1672+
var failedViewChange sync.WaitGroup
1673+
failedViewChange.Add(1)
1674+
1675+
var timeoutExpiredWG sync.WaitGroup
1676+
timeoutExpiredWG.Add(1)
1677+
1678+
var collectPreparesWG sync.WaitGroup
1679+
collectPreparesWG.Add(4)
1680+
1681+
var inFlightCommit sync.WaitGroup
1682+
inFlightCommit.Add(2)
1683+
1684+
var blockCommits uint32
1685+
var blockCommitsForLastNode uint32
1686+
1687+
for i := 1; i <= numberOfNodes; i++ {
1688+
n := newNode(uint64(i), network, t.Name(), testDir, false, 0)
1689+
n.Consensus.Config.SyncOnStart = false // Prevent last node from syncing without a reason
1690+
n.Consensus.Config.ViewChangeTimeout = time.Second * 10 // Make view change faster to speedup the test
1691+
n.Consensus.Config.LeaderHeartbeatTimeout = time.Second * 10 // Force view change sooner to speedup test
1692+
n.Consensus.Config.RequestForwardTimeout = time.Second * 25 // Prevent first node to forward after view change
1693+
n.Consensus.Config.RequestComplainTimeout = time.Second * 30 // Must be bigger than the forward timeout
1694+
n.Consensus.Config.SpeedUpViewChange = false
1695+
nodes = append(nodes, n)
1696+
1697+
baseLogger := n.logger.Desugar()
1698+
1699+
id := n.ID
1700+
n.Consensus.Logger = baseLogger.WithOptions(zap.Hooks(func(entry zapcore.Entry) error {
1701+
if strings.Contains(entry.Message, "collected 2 prepares") && atomic.LoadUint32(&blockCommits) == 1 {
1702+
collectPreparesWG.Done()
1703+
}
1704+
1705+
if strings.Contains(entry.Message, "In-flight view 0 with latest sequence 1 has committed a decision") && (id == 2 || id == 3) {
1706+
inFlightCommit.Done()
1707+
}
1708+
1709+
if strings.Contains(entry.Message, "Timeout expired waiting on In-flight 0 with latest sequence view to commit 1") && id == 4 {
1710+
timeoutExpiredWG.Done()
1711+
}
1712+
1713+
if strings.Contains(entry.Message, "Node 4 was unable to commit the in flight proposal, not changing the view") && id == 4 {
1714+
failedViewChange.Done()
1715+
}
1716+
1717+
return nil
1718+
})).Sugar()
1719+
1720+
// Make all nodes but the last node lose commits per the 'blockCommits' flag
1721+
if i == 4 {
1722+
continue
1723+
}
1724+
n.LoseMessages(func(msg *smartbftprotos.Message) bool {
1725+
if msg.GetCommit() == nil {
1726+
return false
1727+
}
1728+
return atomic.LoadUint32(&blockCommits) == 1
1729+
})
1730+
}
1731+
1732+
// However the last node never receives any commits.
1733+
nodes[len(nodes)-1].LoseMessages(func(msg *smartbftprotos.Message) bool {
1734+
return msg.GetCommit() != nil && atomic.LoadUint32(&blockCommitsForLastNode) == 1
1735+
})
1736+
1737+
startNodes(nodes, &network)
1738+
1739+
atomic.StoreUint32(&blockCommits, 1)
1740+
atomic.StoreUint32(&blockCommitsForLastNode, 1)
1741+
1742+
nodes[0].Submit(Request{ID: "first request", ClientID: "alice"})
1743+
1744+
// Wait for prepares to be collected
1745+
collectPreparesWG.Wait()
1746+
1747+
// Ensure no one commits a block because of not enough commits gathered
1748+
var doNotCommitWG sync.WaitGroup
1749+
doNotCommitWG.Add(numberOfNodes)
1750+
1751+
for i := 0; i < numberOfNodes; i++ {
1752+
go func(deliverChannel chan *AppRecord) {
1753+
defer doNotCommitWG.Done()
1754+
select {
1755+
case <-deliverChannel:
1756+
assert.Fail(t, "should not have committed because first commit message should have been lost")
1757+
case <-time.After(time.Second):
1758+
}
1759+
}(nodes[i].Delivered)
1760+
}
1761+
doNotCommitWG.Wait()
1762+
1763+
// Re-enable commit messages to flow to all nodes but the last node
1764+
atomic.StoreUint32(&blockCommits, 0)
1765+
1766+
// Disconnect leader to force view change
1767+
nodes[0].Disconnect()
1768+
1769+
// Wait for view change and ensure in-flight view has committed the previous decision
1770+
inFlightCommit.Wait()
1771+
1772+
// Re-connect the leader
1773+
nodes[0].Connect()
1774+
1775+
// Ensure that only nodes 1 to 3 have committed in-flight
1776+
d1 := <-nodes[0].Delivered
1777+
d2 := <-nodes[1].Delivered
1778+
d3 := <-nodes[2].Delivered
1779+
select {
1780+
case <-nodes[3].Delivered:
1781+
t.Fatalf("Node 4 has committed but shouldn't have")
1782+
case <-time.After(time.Second):
1783+
}
1784+
1785+
// Ensure all nodes that committed, committed the same decision
1786+
assert.Equal(t, d1, d2)
1787+
assert.Equal(t, d2, d3)
1788+
1789+
// Wait for in-flight view of view changer to time out
1790+
timeoutExpiredWG.Wait()
1791+
// Re-enable commits to flow to the last node
1792+
atomic.StoreUint32(&blockCommitsForLastNode, 0)
1793+
// Ensure the last node failed changing the view
1794+
failedViewChange.Wait()
1795+
// However, it eventually syncs successfully and delivers the proposal
1796+
select {
1797+
case d4 := <-nodes[3].Delivered:
1798+
assert.Equal(t, d3, d4)
1799+
case <-time.After(time.Second * 30):
1800+
t.Fatalf("Didn't commit decision in a timely manner")
1801+
}
1802+
}
1803+
16601804
func TestBlacklistAndRedemption(t *testing.T) {
16611805
t.Parallel()
16621806
network := make(Network)

test/network.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,9 @@ func (node *Node) serve() {
181181
node.RUnlock()
182182
switch msg := m.message.(type) {
183183
case *smartbftprotos.Message:
184+
if node.app != nil && node.app.messageLost != nil && node.app.messageLost(msg) {
185+
continue
186+
}
184187
handler.HandleMessage(uint64(m.from), msg)
185188
default:
186189
handler.HandleRequest(uint64(m.from), msg.(*FwdMessage).Payload)

test/test_app.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type App struct {
5959
logger *zap.SugaredLogger
6060
lastRecord lastRecord
6161
verificationSeq uint64
62+
messageLost func(*smartbftprotos.Message) bool
6263
}
6364

6465
type lastRecord struct {
@@ -180,6 +181,10 @@ func (a *App) ClearMutateSend(target uint64) {
180181
delete(a.Node.peerMutatingFunc, target)
181182
}
182183

184+
func (a *App) LoseMessages(filter func(*smartbftprotos.Message) bool) {
185+
a.messageLost = filter
186+
}
187+
183188
// RequestID returns info about the given request
184189
func (a *App) RequestID(req []byte) types.RequestInfo {
185190
txn := requestFromBytes(req)
@@ -245,7 +250,7 @@ func (a *App) Sign([]byte) []byte {
245250

246251
// SignProposal signs on the given proposal
247252
func (a *App) SignProposal(_ types.Proposal, aux []byte) *types.Signature {
248-
if len(aux) == 0 && len(a.Node.n) > 1 {
253+
if len(aux) == 0 && len(a.Node.n) > 1 && a.messageLost == nil {
249254
panic(fmt.Sprintf("didn't receive prepares from anyone, n=%d", len(a.Node.n)))
250255
}
251256
return &types.Signature{ID: a.ID, Msg: aux}

0 commit comments

Comments
 (0)