Skip to content

Commit 605b7d4

Browse files
committed
feat: Consensus lag detection
1 parent 3691db4 commit 605b7d4

File tree

6 files changed

+105
-78
lines changed

6 files changed

+105
-78
lines changed

consensus/driver/driver.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,11 +176,18 @@ func (d *Driver[V, H, A]) execute(
176176
}
177177

178178
return true, nil
179+
180+
case *actions.TriggerSync:
181+
d.triggerSync(*action)
179182
}
180183
}
181184
return false, nil
182185
}
183186

187+
func (d *Driver[V, H, A]) triggerSync(sync actions.TriggerSync) {
188+
// TODO: Implement this
189+
}
190+
184191
func (d *Driver[V, H, A]) setTimeout(ctx context.Context, timeout types.Timeout) {
185192
d.scheduledTms[timeout] = time.AfterFunc(d.getTimeout(timeout.Step, timeout.Round), func() {
186193
select {

consensus/tendermint/process.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,36 @@ func (s *stateMachine[V, H, A]) ProcessPrecommit(p *types.Precommit[H, A]) []act
3838
if !s.voteCounter.AddPrecommit(p) || !s.isHeightStarted {
3939
return nil
4040
}
41+
42+
if s.hasFuturePrecommitQuorum(p) {
43+
return s.triggerSync(p)
44+
}
45+
4146
return s.processMessage(p, (*wal.WALPrecommit[H, A])(p))
4247
}
4348

49+
func (s *stateMachine[V, H, A]) hasFuturePrecommitQuorum(p *types.Precommit[H, A]) bool {
50+
return p.ID != nil &&
51+
p.Height > s.state.height &&
52+
p.Height > s.lastTriggerSync &&
53+
s.voteCounter.HasFuturePrecommitQuorum(p.Height, p.Round, p.ID)
54+
}
55+
56+
func (s *stateMachine[V, H, A]) triggerSync(p *types.Precommit[H, A]) []actions.Action[V, H, A] {
57+
syncStart := max(s.lastTriggerSync+1, s.state.height)
58+
s.lastTriggerSync = p.Height
59+
60+
return []actions.Action[V, H, A]{
61+
&actions.WriteWAL[V, H, A]{
62+
Entry: (*wal.WALPrecommit[H, A])(p),
63+
},
64+
&actions.TriggerSync{
65+
Start: syncStart,
66+
End: p.Height,
67+
},
68+
}
69+
}
70+
4471
func (s *stateMachine[V, H, A]) processMessage(
4572
msg types.Message[V, H, A],
4673
walEntry wal.Entry[V, H, A],

consensus/tendermint/tendermint.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type stateMachine[V types.Hashable[H], H types.Hash, A types.Addr] struct {
4040
voteCounter votecounter.VoteCounter[V, H, A]
4141
application Application[V, H]
4242
isHeightStarted bool
43+
lastTriggerSync types.Height
4344
}
4445

4546
type state[V types.Hashable[H], H types.Hash] struct {

consensus/types/actions/actions.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,22 @@ type ScheduleTimeout types.Timeout
2424

2525
type Commit[V types.Hashable[H], H types.Hash, A types.Addr] types.Proposal[V, H, A]
2626

27+
type TriggerSync struct {
28+
Start, End types.Height
29+
}
30+
2731
func (a *WriteWAL[V, H, A]) RequiresWALFlush() bool { return false }
2832
func (a *BroadcastProposal[V, H, A]) RequiresWALFlush() bool { return true }
2933
func (a *BroadcastPrevote[H, A]) RequiresWALFlush() bool { return true }
3034
func (a *BroadcastPrecommit[H, A]) RequiresWALFlush() bool { return true }
3135
func (a *ScheduleTimeout) RequiresWALFlush() bool { return false }
3236
func (a *Commit[V, H, A]) RequiresWALFlush() bool { return true }
37+
func (a *TriggerSync) RequiresWALFlush() bool { return false }
3338

3439
func (a *WriteWAL[V, H, A]) isTendermintAction() {}
3540
func (a *BroadcastProposal[V, H, A]) isTendermintAction() {}
3641
func (a *BroadcastPrevote[H, A]) isTendermintAction() {}
3742
func (a *BroadcastPrecommit[H, A]) isTendermintAction() {}
3843
func (a *ScheduleTimeout) isTendermintAction() {}
3944
func (a *Commit[V, H, A]) isTendermintAction() {}
45+
func (a *TriggerSync) isTendermintAction() {}

consensus/votecounter/vote_counter.go

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package votecounter
22

33
import (
4-
"iter"
5-
64
"github.com/NethermindEth/juno/consensus/types"
75
"github.com/NethermindEth/juno/utils"
86
)
@@ -42,23 +40,6 @@ func New[V types.Hashable[H], H types.Hash, A types.Addr](validators Validators[
4240
}
4341
}
4442

45-
func (v *VoteCounter[V, H, A]) LoadFromMessages(messages iter.Seq2[types.Message[V, H, A], error]) error {
46-
for msg, err := range messages {
47-
if err != nil {
48-
return err
49-
}
50-
switch msg := msg.(type) {
51-
case *types.Proposal[V, H, A]:
52-
v.AddProposal(msg)
53-
case *types.Prevote[H, A]:
54-
v.AddPrevote(msg)
55-
case *types.Precommit[H, A]:
56-
v.AddPrecommit(msg)
57-
}
58-
}
59-
return nil
60-
}
61-
6243
func (v *VoteCounter[V, H, A]) StartNewHeight() {
6344
v.currentHeight++
6445
v.totalVotingPower = v.validators.TotalVotingPower(v.currentHeight)
@@ -74,23 +55,26 @@ func (v *VoteCounter[V, H, A]) StartNewHeight() {
7455
}
7556
}
7657

77-
func (v *VoteCounter[V, H, A]) getRoundDataAndVotingPower(header *types.MessageHeader[A]) (*roundData[V, H, A], types.VotingPower, bool) {
78-
if header.Height < v.currentHeight {
79-
return nil, 0, false
58+
func (v *VoteCounter[V, H, A]) getRoundData(
59+
height types.Height,
60+
round types.Round,
61+
) (*roundData[V, H, A], bool) {
62+
if height < v.currentHeight {
63+
return nil, false
8064
}
8165

82-
votingPower := v.validators.ValidatorVotingPower(header.Height, &header.Sender)
83-
84-
if header.Height == v.currentHeight {
85-
return getOrCreateRoundData(v.roundData, header.Round), votingPower, true
66+
var roundData roundMap[V, H, A]
67+
if height == v.currentHeight {
68+
roundData = v.roundData
69+
} else {
70+
var ok bool
71+
if roundData, ok = v.futureMessages[height]; !ok {
72+
roundData = make(roundMap[V, H, A])
73+
v.futureMessages[height] = roundData
74+
}
8675
}
8776

88-
futureRoundMap, ok := v.futureMessages[header.Height]
89-
if !ok {
90-
futureRoundMap = make(roundMap[V, H, A])
91-
v.futureMessages[header.Height] = futureRoundMap
92-
}
93-
return getOrCreateRoundData(futureRoundMap, header.Round), votingPower, true
77+
return getOrCreateRoundData(roundData, round), true
9478
}
9579

9680
func getOrCreateRoundData[V types.Hashable[H], H types.Hash, A types.Addr](
@@ -106,7 +90,7 @@ func getOrCreateRoundData[V types.Hashable[H], H types.Hash, A types.Addr](
10690
}
10791

10892
func (v *VoteCounter[V, H, A]) AddProposal(proposal *types.Proposal[V, H, A]) bool {
109-
roundData, votingPower, ok := v.getRoundDataAndVotingPower(&proposal.MessageHeader)
93+
roundData, ok := v.getRoundData(proposal.Height, proposal.Round)
11094
if !ok {
11195
return false
11296
}
@@ -115,24 +99,30 @@ func (v *VoteCounter[V, H, A]) AddProposal(proposal *types.Proposal[V, H, A]) bo
11599
return false
116100
}
117101

102+
votingPower := v.validators.ValidatorVotingPower(proposal.Height, &proposal.Sender)
103+
118104
return roundData.setProposal(proposal, votingPower)
119105
}
120106

121107
func (v *VoteCounter[V, H, A]) AddPrevote(prevote *types.Prevote[H, A]) bool {
122-
roundData, votingPower, ok := v.getRoundDataAndVotingPower(&prevote.MessageHeader)
108+
roundData, ok := v.getRoundData(prevote.Height, prevote.Round)
123109
if !ok {
124110
return false
125111
}
126112

113+
votingPower := v.validators.ValidatorVotingPower(prevote.Height, &prevote.Sender)
114+
127115
return roundData.addVote((*types.Vote[H, A])(prevote), votingPower, Prevote)
128116
}
129117

130118
func (v *VoteCounter[V, H, A]) AddPrecommit(precommit *types.Precommit[H, A]) bool {
131-
roundData, votingPower, ok := v.getRoundDataAndVotingPower(&precommit.MessageHeader)
119+
roundData, ok := v.getRoundData(precommit.Height, precommit.Round)
132120
if !ok {
133121
return false
134122
}
135123

124+
votingPower := v.validators.ValidatorVotingPower(precommit.Height, &precommit.Sender)
125+
136126
return roundData.addVote((*types.Vote[H, A])(precommit), votingPower, Precommit)
137127
}
138128

@@ -163,6 +153,19 @@ func (v *VoteCounter[V, H, A]) HasQuorumForAny(round types.Round, voteType VoteT
163153
return roundData.countAny(voteType) >= v.quorumVotingPower
164154
}
165155

156+
func (v *VoteCounter[V, H, A]) HasFuturePrecommitQuorum(
157+
height types.Height,
158+
round types.Round,
159+
id *H,
160+
) bool {
161+
roundData, ok := v.getRoundData(height, round)
162+
if !ok {
163+
return false
164+
}
165+
166+
return roundData.countVote(Precommit, id) >= v.quorumVotingPower
167+
}
168+
166169
func (v *VoteCounter[V, H, A]) HasNonFaultyFutureMessage(round types.Round) bool {
167170
roundData, ok := v.roundData[round]
168171
if !ok {

consensus/votecounter/vote_counter_test.go

Lines changed: 26 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
package votecounter
22

33
import (
4-
"iter"
54
"testing"
65

76
"github.com/NethermindEth/juno/consensus/starknet"
87
"github.com/NethermindEth/juno/consensus/types"
98
"github.com/NethermindEth/juno/core/felt"
109
"github.com/stretchr/testify/assert"
11-
"github.com/stretchr/testify/require"
1210
)
1311

1412
func assertVoteCounter(
@@ -23,12 +21,34 @@ func assertVoteCounter(
2321
f := f(voteCounter.validators.TotalVotingPower(testHeight))
2422
q := q(voteCounter.validators.TotalVotingPower(testHeight))
2523

26-
assert.Equal(t, countVotes(isVotingNil, isPrevote) >= q, voteCounter.HasQuorumForVote(testRound, Prevote, nil))
27-
assert.Equal(t, countVotes(isVotingNil, isPrecommit) >= q, voteCounter.HasQuorumForVote(testRound, Precommit, nil))
24+
assert.Equal(
25+
t,
26+
countVotes(isVotingNil, isPrevote) >= q,
27+
voteCounter.HasQuorumForVote(testRound, Prevote, nil),
28+
)
29+
assert.Equal(
30+
t,
31+
countVotes(isVotingNil, isPrecommit) >= q,
32+
voteCounter.HasQuorumForVote(testRound, Precommit, nil),
33+
)
2834

2935
for _, id := range allIDs {
30-
assert.Equal(t, countVotes(isVotingID(id), isPrevote) >= q, voteCounter.HasQuorumForVote(testRound, Prevote, &id))
31-
assert.Equal(t, countVotes(isVotingID(id), isPrecommit) >= q, voteCounter.HasQuorumForVote(testRound, Precommit, &id))
36+
assert.Equal(
37+
t,
38+
countVotes(isVotingID(id), isPrevote) >= q,
39+
voteCounter.HasQuorumForVote(testRound, Prevote, &id),
40+
)
41+
assert.Equal(
42+
t,
43+
countVotes(isVotingID(id), isPrecommit) >= q,
44+
voteCounter.HasQuorumForVote(testRound, Precommit, &id),
45+
)
46+
47+
assert.Equal(
48+
t,
49+
countVotes(isVotingID(id), isPrecommit) >= q,
50+
voteCounter.HasFuturePrecommitQuorum(testHeight+1, testRound, &id),
51+
)
3252
}
3353

3454
assert.Equal(t, countVotes(isPrevote) >= q, voteCounter.HasQuorumForAny(testRound, Prevote))
@@ -37,30 +57,6 @@ func assertVoteCounter(
3757
assert.Equal(t, countVotes() > f, voteCounter.HasNonFaultyFutureMessage(testRound))
3858
}
3959

40-
func convertToMessages(t *testing.T, testCases []testCase) iter.Seq2[starknet.Message, error] {
41-
return func(yield func(starknet.Message, error) bool) {
42-
t.Helper()
43-
for _, testCase := range testCases {
44-
var message starknet.Message
45-
switch testCase := testCase.(type) {
46-
case *proposalTestCase:
47-
message = &testCase.proposal
48-
case *voteTestCase:
49-
switch testCase.voteType {
50-
case Prevote:
51-
message = (*starknet.Prevote)(&testCase.vote)
52-
case Precommit:
53-
message = (*starknet.Precommit)(&testCase.vote)
54-
}
55-
}
56-
assert.NotNil(t, message)
57-
if !yield(message, nil) {
58-
return
59-
}
60-
}
61-
}
62-
}
63-
6460
func testMultipleHeights[T any](
6561
t *testing.T,
6662
message *T,
@@ -89,19 +85,6 @@ func testMultipleHeights[T any](
8985
func TestVoteCounter(t *testing.T) {
9086
tests := simpleTestScenarios
9187

92-
t.Run("LoadFromMessages", func(t *testing.T) {
93-
for i := range tests {
94-
t.Run(tests[i].String(), func(t *testing.T) {
95-
tests := tests[:i+1]
96-
messages := convertToMessages(t, tests)
97-
voteCounter := New[starknet.Value](simpleMockValidator, testHeight)
98-
99-
require.NoError(t, voteCounter.LoadFromMessages(messages))
100-
assertVoteCounter(t, &voteCounter, tests, simpleVotingPower)
101-
})
102-
}
103-
})
104-
10588
t.Run("Add", func(t *testing.T) {
10689
var firstProposal *starknet.Proposal
10790

0 commit comments

Comments
 (0)