Skip to content

Commit 21fa215

Browse files
committed
feat: Rewrite WAL and avoid missing entries
1 parent 2773af1 commit 21fa215

27 files changed

+541
-482
lines changed

consensus/consensus.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func Init(
5454

5555
proposalStore := proposal.ProposalStore[starknet.Hash]{}
5656
proposer := proposer.New(logger, &builder, &proposalStore, *nodeAddress, toValue)
57-
stateMachine := tendermint.New(tendermintDB, logger, *nodeAddress, proposer, validators, currentHeight)
57+
stateMachine := tendermint.New(logger, *nodeAddress, proposer, validators, currentHeight)
5858

5959
p2p := p2p.New(host, logger, &builder, &proposalStore, currentHeight, &config.DefaultBufferSizes, bootstrapPeersFn)
6060

consensus/db/db.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ type walMsgCount uint32
3333
// 2. Right before we broadcast a message
3434
//
3535
// We call Delete when we start a new height and commit a block
36-
//
37-
//go:generate mockgen -destination=../mocks/mock_db.go -package=mocks github.com/NethermindEth/juno/consensus/db TendermintDB
3836
type TendermintDB[V types.Hashable[H], H types.Hash, A types.Addr] interface {
3937
// Flush writes the accumulated batch operations to the underlying database.
4038
Flush() error

consensus/driver/driver.go

Lines changed: 67 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package driver
22

33
import (
44
"context"
5+
"fmt"
56
"time"
67

78
"github.com/NethermindEth/juno/consensus/db"
@@ -62,16 +63,40 @@ func (d *Driver[V, H, A]) Run(ctx context.Context) error {
6263
}
6364
}()
6465

65-
d.stateMachine.ReplayWAL()
66+
if err := d.replay(ctx); err != nil {
67+
return err
68+
}
69+
70+
return d.listen(ctx)
71+
}
72+
73+
func (d *Driver[V, H, A]) replay(ctx context.Context) error {
74+
for walEntry, err := range d.db.LoadAllEntries() {
75+
if err != nil {
76+
return fmt.Errorf("failed to load WAL entries: %w", err)
77+
}
6678

79+
if _, err := d.execute(ctx, true, d.stateMachine.ProcessWAL(walEntry)); err != nil {
80+
return err
81+
}
82+
}
83+
84+
return nil
85+
}
86+
87+
func (d *Driver[V, H, A]) listen(ctx context.Context) error {
6788
for {
6889
select {
6990
case <-ctx.Done():
7091
return nil
7192
default:
7293
}
94+
7395
actions := d.stateMachine.ProcessStart(0)
74-
isCommitted := d.execute(ctx, actions)
96+
isCommitted, err := d.execute(ctx, false, actions)
97+
if err != nil {
98+
return err
99+
}
75100

76101
// Todo: check message signature everytime a message is received.
77102
// For the time being it can be assumed the signature is correct.
@@ -100,46 +125,69 @@ func (d *Driver[V, H, A]) Run(ctx context.Context) error {
100125
actions = d.stateMachine.ProcessPrecommit(p)
101126
}
102127

103-
isCommitted = d.execute(ctx, actions)
128+
isCommitted, err = d.execute(ctx, false, actions)
129+
if err != nil {
130+
return err
131+
}
104132
}
105133
}
106134
}
107135

108136
// This function executes the actions returned by the stateMachine.
109137
// It returns true if a commit action was executed. This is to notify the caller to start a new height with round 0.
138+
// Note: `WriteWAL` actions are generated as part of processing the event itself, so there's no
139+
// need to write them to the WAL again here. `isReplaying` is used to disable the writing of WAL.
110140
func (d *Driver[V, H, A]) execute(
111141
ctx context.Context,
112-
executingActions []actions.Action[V, H, A],
113-
) (isCommitted bool) {
114-
for _, action := range executingActions {
142+
isReplaying bool,
143+
resultActions []actions.Action[V, H, A],
144+
) (isCommitted bool, err error) {
145+
for _, action := range resultActions {
146+
if !isReplaying && action.RequiresWALFlush() {
147+
if err := d.db.Flush(); err != nil {
148+
return false, fmt.Errorf("failed to flush WAL: %w", err)
149+
}
150+
}
151+
115152
switch action := action.(type) {
153+
case *actions.WriteWAL[V, H, A]:
154+
if !isReplaying {
155+
if err := d.db.SetWALEntry(action.Entry); err != nil {
156+
return false, fmt.Errorf("failed to write WAL: %w", err)
157+
}
158+
}
159+
116160
case *actions.BroadcastProposal[V, H, A]:
117161
d.broadcasters.ProposalBroadcaster.Broadcast(ctx, (*types.Proposal[V, H, A])(action))
162+
118163
case *actions.BroadcastPrevote[H, A]:
119164
d.broadcasters.PrevoteBroadcaster.Broadcast(ctx, (*types.Prevote[H, A])(action))
165+
120166
case *actions.BroadcastPrecommit[H, A]:
121167
d.broadcasters.PrecommitBroadcaster.Broadcast(ctx, (*types.Precommit[H, A])(action))
168+
122169
case *actions.ScheduleTimeout:
123-
d.scheduledTms[types.Timeout(*action)] = time.AfterFunc(d.getTimeout(action.Step, action.Round), func() {
124-
select {
125-
case <-ctx.Done():
126-
case d.timeoutsCh <- types.Timeout(*action):
127-
}
128-
})
129-
case *actions.Commit[V, H, A]:
130-
if err := d.db.Flush(); err != nil {
131-
d.log.Fatalf("failed to flush WAL during commit", "height", action.Height, "round", action.Round, "err", err)
132-
}
170+
d.setTimeout(ctx, types.Timeout(*action))
133171

172+
case *actions.Commit[V, H, A]:
134173
d.log.Debugw("Committing", "height", action.Height, "round", action.Round)
135174
d.commitListener.OnCommit(ctx, action.Height, *action.Value)
136175

137176
if err := d.db.DeleteWALEntries(action.Height); err != nil {
138-
d.log.Errorw("failed to delete WAL messages during commit", "height", action.Height, "round", action.Round, "err", err)
177+
return true, fmt.Errorf("failed to delete WAL messages during commit: %w", err)
139178
}
140179

141-
return true
180+
return true, nil
142181
}
143182
}
144-
return false
183+
return false, nil
184+
}
185+
186+
func (d *Driver[V, H, A]) setTimeout(ctx context.Context, timeout types.Timeout) {
187+
d.scheduledTms[timeout] = time.AfterFunc(d.getTimeout(timeout.Step, timeout.Round), func() {
188+
select {
189+
case <-ctx.Done():
190+
case d.timeoutsCh <- timeout:
191+
}
192+
})
145193
}

consensus/driver/driver_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,6 @@ func TestDriver(t *testing.T) {
157157
stateMachine := mocks.NewMockStateMachine[starknet.Value, starknet.Hash, starknet.Address](
158158
ctrl,
159159
)
160-
stateMachine.EXPECT().ReplayWAL().AnyTimes().Return() // ignore WAL replay logic here
161160

162161
commitAction := starknet.Commit(getRandProposal(random))
163162

consensus/mocks/mock_db.go

Lines changed: 0 additions & 99 deletions
This file was deleted.

consensus/mocks/mock_state_machine.go

Lines changed: 9 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

consensus/starknet/starknet.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,15 @@ type (
2525
MessageHeader = types.MessageHeader[Address]
2626

2727
Action = actions.Action[Value, Hash, Address]
28+
WriteWAL = actions.WriteWAL[Value, Hash, Address]
2829
BroadcastProposal = actions.BroadcastProposal[Value, Hash, Address]
2930
BroadcastPrevote = actions.BroadcastPrevote[Hash, Address]
3031
BroadcastPrecommit = actions.BroadcastPrecommit[Hash, Address]
3132
Commit = actions.Commit[Value, Hash, Address]
3233

3334
WALEntry = wal.Entry[Value, Hash, Address]
34-
WALProposal = wal.WALProposal[Value, Hash, Address]
35-
WALPrevote = wal.WALPrevote[Hash, Address]
36-
WALPrecommit = wal.WALPrecommit[Hash, Address]
37-
WALTimeout = wal.WALTimeout
35+
WALProposal = wal.Proposal[Value, Hash, Address]
36+
WALPrevote = wal.Prevote[Hash, Address]
37+
WALPrecommit = wal.Precommit[Hash, Address]
38+
WALTimeout = wal.Timeout
3839
)

consensus/sync/sync_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func TestSync(t *testing.T) {
6565
cancel()
6666
})
6767

68-
stateMachine := tendermint.New(tmDB, logger, nodeAddr, mockApp, allNodes, types.Height(0))
68+
stateMachine := tendermint.New(logger, nodeAddr, mockApp, allNodes, types.Height(0))
6969

7070
proposalCh := make(chan *starknet.Proposal)
7171
prevoteCh := make(chan *starknet.Prevote)

0 commit comments

Comments
 (0)