Skip to content

Commit 2463919

Browse files
committed
Add last submitted da heights to raft
1 parent 695324e commit 2463919

File tree

9 files changed

+97
-36
lines changed

9 files changed

+97
-36
lines changed

block/internal/cache/manager.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@ type PendingManager interface {
8181
GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error)
8282
GetPendingData(ctx context.Context) ([]*types.SignedData, error)
8383
SetLastSubmittedHeaderHeight(ctx context.Context, height uint64)
84+
GetLastSubmittedHeaderHeight() uint64
8485
SetLastSubmittedDataHeight(ctx context.Context, height uint64)
86+
GetLastSubmittedDataHeight() uint64
8587
NumPendingHeaders() uint64
8688
NumPendingData() uint64
8789
}
@@ -347,10 +349,18 @@ func (m *implementation) GetPendingData(ctx context.Context) ([]*types.SignedDat
347349
return signedDataList, nil
348350
}
349351

352+
func (m *implementation) GetLastSubmittedHeaderHeight() uint64 {
353+
return m.pendingHeaders.GetLastSubmittedDataHeight()
354+
}
355+
350356
func (m *implementation) SetLastSubmittedHeaderHeight(ctx context.Context, height uint64) {
351357
m.pendingHeaders.SetLastSubmittedHeaderHeight(ctx, height)
352358
}
353359

360+
func (m *implementation) GetLastSubmittedDataHeight() uint64 {
361+
return m.pendingData.GetLastSubmittedDataHeight()
362+
}
363+
354364
func (m *implementation) SetLastSubmittedDataHeight(ctx context.Context, height uint64) {
355365
m.pendingData.SetLastSubmittedDataHeight(ctx, height)
356366
}

block/internal/cache/pending_base.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ func (pb *pendingBase[T]) numPending() uint64 {
7171
return height - pb.lastHeight.Load()
7272
}
7373

74+
func (pb *pendingBase[T]) getLastSubmittedHeight() uint64 {
75+
return pb.lastHeight.Load()
76+
}
77+
7478
func (pb *pendingBase[T]) setLastSubmittedHeight(ctx context.Context, newLastSubmittedHeight uint64) {
7579
lsh := pb.lastHeight.Load()
7680
if newLastSubmittedHeight > lsh && pb.lastHeight.CompareAndSwap(lsh, newLastSubmittedHeight) {

block/internal/cache/pending_data.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cache
22

33
import (
44
"context"
5+
"errors"
56

67
"github.com/rs/zerolog"
78

@@ -28,8 +29,17 @@ type PendingData struct {
2829
base *pendingBase[*types.Data]
2930
}
3031

32+
var errInFlightData = errors.New("inflight data")
33+
3134
func fetchData(ctx context.Context, store store.Store, height uint64) (*types.Data, error) {
3235
_, data, err := store.GetBlockData(ctx, height)
36+
if err != nil {
37+
return nil, err
38+
}
39+
// in the executor, WIP data is temporary stored. skip them until the process is completed
40+
if data.Height() == 0 {
41+
return nil, errInFlightData
42+
}
3343
return data, err
3444
}
3545

@@ -58,3 +68,6 @@ func (pd *PendingData) NumPendingData() uint64 {
5868
func (pd *PendingData) SetLastSubmittedDataHeight(ctx context.Context, newLastSubmittedDataHeight uint64) {
5969
pd.base.setLastSubmittedHeight(ctx, newLastSubmittedDataHeight)
6070
}
71+
func (pd *PendingData) GetLastSubmittedDataHeight() uint64 {
72+
return pd.base.getLastSubmittedHeight()
73+
}

block/internal/cache/pending_headers.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cache
22

33
import (
44
"context"
5+
"errors"
56

67
"github.com/rs/zerolog"
78

@@ -25,8 +26,17 @@ type PendingHeaders struct {
2526
base *pendingBase[*types.SignedHeader]
2627
}
2728

29+
var errInFlightHeader = errors.New("inflight header")
30+
2831
func fetchSignedHeader(ctx context.Context, store storepkg.Store, height uint64) (*types.SignedHeader, error) {
2932
header, err := store.GetHeader(ctx, height)
33+
if err != nil {
34+
return nil, err
35+
}
36+
// in the executor, WIP headers are temporary stored. skip them until the process is completed
37+
if header.Height() == 0 {
38+
return nil, errInFlightHeader
39+
}
3040
return header, err
3141
}
3242

@@ -55,3 +65,7 @@ func (ph *PendingHeaders) SetLastSubmittedHeaderHeight(ctx context.Context, newL
5565
func (ph *PendingHeaders) init() error {
5666
return ph.base.init()
5767
}
68+
69+
func (ph *PendingHeaders) GetLastSubmittedDataHeight() uint64 {
70+
return ph.base.getLastSubmittedHeight()
71+
}

block/internal/executing/executor.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -443,17 +443,18 @@ func (e *Executor) produceBlock() error {
443443
}
444444

445445
raftState := &raft.RaftBlockState{
446-
Height: newHeight,
447-
Hash: header.Hash(),
448-
Timestamp: header.BaseHeader.Time,
449-
Header: headerBytes,
450-
Data: dataBytes,
446+
Height: newHeight,
447+
Hash: header.Hash(),
448+
Timestamp: header.BaseHeader.Time,
449+
Header: headerBytes,
450+
Data: dataBytes,
451+
LastSubmittedDAHeaderHeight: e.cache.GetLastSubmittedHeaderHeight(),
452+
LastSubmittedDADataHeight: e.cache.GetLastSubmittedDataHeight(),
451453
}
452454
if err := e.raftNode.Broadcast(e.ctx, raftState); err != nil {
453455
return fmt.Errorf("failed to propose block to raft: %w", err)
454456
}
455457
e.logger.Debug().Uint64("height", newHeight).Msg("proposed block to raft")
456-
457458
}
458459
if err := batch.Commit(); err != nil {
459460
return fmt.Errorf("failed to commit batch: %w", err)

block/internal/syncing/raft_retriever.go

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@ func (e eventProcessorFn) handle(ctx context.Context, event common.DAHeightEvent
2222
return e(ctx, event)
2323
}
2424

25+
type raftStatePreProcessor func(ctx context.Context, state *raft.RaftBlockState) error
2526
type raftRetriever struct {
26-
raftNode common.RaftNode
27-
wg sync.WaitGroup
28-
logger zerolog.Logger
29-
genesis genesis.Genesis
30-
eventProcessor eventProcessor
27+
raftNode common.RaftNode
28+
wg sync.WaitGroup
29+
logger zerolog.Logger
30+
genesis genesis.Genesis
31+
eventProcessor eventProcessor
32+
raftBlockPreProcessor raftStatePreProcessor
3133

3234
mtx sync.Mutex
3335
cancel context.CancelFunc
@@ -38,12 +40,14 @@ func newRaftRetriever(
3840
genesis genesis.Genesis,
3941
logger zerolog.Logger,
4042
eventProcessor eventProcessor,
43+
raftBlockPostProcessor raftStatePreProcessor,
4144
) *raftRetriever {
4245
return &raftRetriever{
43-
raftNode: raftNode,
44-
genesis: genesis,
45-
logger: logger,
46-
eventProcessor: eventProcessor,
46+
raftNode: raftNode,
47+
genesis: genesis,
48+
logger: logger,
49+
eventProcessor: eventProcessor,
50+
raftBlockPreProcessor: raftBlockPostProcessor,
4751
}
4852
}
4953

@@ -98,6 +102,9 @@ func (r *raftRetriever) raftApplyLoop(ctx context.Context, applyCh <-chan raft.R
98102
// consumeRaftBlock applies a block received from raft consensus
99103
func (r *raftRetriever) consumeRaftBlock(ctx context.Context, state *raft.RaftBlockState) error {
100104
r.logger.Debug().Uint64("height", state.Height).Msg("applying raft block")
105+
if err := r.raftBlockPreProcessor(ctx, state); err != nil {
106+
return err
107+
}
101108

102109
// Unmarshal header and data
103110
var header types.SignedHeader
@@ -122,7 +129,7 @@ func (r *raftRetriever) consumeRaftBlock(ctx context.Context, state *raft.RaftBl
122129
event := common.DAHeightEvent{
123130
Header: &header,
124131
Data: &data,
125-
DaHeight: 0, // raft events don't have DA height context
132+
DaHeight: 0, // raft events don't have DA height context, yet as DA submission is asynchronous
126133
}
127134
return r.eventProcessor.handle(ctx, event)
128135
}

block/internal/syncing/syncer.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"sync/atomic"
1212
"time"
1313

14+
"github.com/evstack/ev-node/pkg/raft"
1415
pubsub "github.com/libp2p/go-libp2p-pubsub"
1516
"github.com/rs/zerolog"
1617
"golang.org/x/sync/errgroup"
@@ -79,7 +80,7 @@ func NewSyncer(
7980
store store.Store,
8081
exec coreexecutor.Executor,
8182
daClient da.Client,
82-
cache cache.CacheManager,
83+
cache cache.Manager,
8384
metrics *common.Metrics,
8485
config config.Config,
8586
genesis genesis.Genesis,
@@ -108,7 +109,12 @@ func NewSyncer(
108109
logger: logger.With().Str("component", "syncer").Logger(),
109110
}
110111
if raftNode != nil && !reflect.ValueOf(raftNode).IsNil() {
111-
s.raftRetriever = newRaftRetriever(raftNode, genesis, logger, eventProcessorFn(s.pipeEvent))
112+
s.raftRetriever = newRaftRetriever(raftNode, genesis, logger, eventProcessorFn(s.pipeEvent), func(ctx context.Context, state *raft.RaftBlockState) error {
113+
s.logger.Info().Uint64("header_height", state.LastSubmittedDAHeaderHeight).Uint64("data_height", state.LastSubmittedDADataHeight).Msg("+++ received raft block state")
114+
cache.SetLastSubmittedHeaderHeight(ctx, state.LastSubmittedDAHeaderHeight)
115+
cache.SetLastSubmittedDataHeight(ctx, state.LastSubmittedDADataHeight)
116+
return nil
117+
})
112118
}
113119
return s
114120
}

pkg/raft/types.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ import "fmt"
44

55
// RaftBlockState represents a replicated block state
66
type RaftBlockState struct {
7-
Height uint64
8-
Hash []byte
9-
Timestamp uint64
10-
Header []byte
11-
Data []byte
7+
Height uint64 `json:"height"`
8+
LastSubmittedDAHeaderHeight uint64 `json:"last_submitted_da_header_height"`
9+
LastSubmittedDADataHeight uint64 `json:"last_submitted_da_data_height"`
10+
Hash []byte `json:"hash"`
11+
Timestamp uint64 `json:"timestamp"`
12+
Header []byte `json:"header"`
13+
Data []byte `json:"data"`
1214
}
1315

1416
// assertValid checks basic constraints but does not ensure that no gaps exist or chain continuity

test/e2e/failover_e2e_test.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"os"
1414
"path/filepath"
1515
"slices"
16-
"strconv"
1716
"strings"
1817
"sync"
1918
"sync/atomic"
@@ -230,7 +229,7 @@ func TestLeaseFailoverE2E(t *testing.T) {
230229
}, 2*must(time.ParseDuration(DefaultDABlockTime)), 100*time.Millisecond)
231230

232231
t.Log("+++ Verifying no DA gaps...")
233-
verifyBlocks(t, daStartHeight, lastDABlockNewLeader, jwtSecret, testEndpoints.GetDAAddress(), genesisHeight, state.LastBlockHeight)
232+
verifyDABlocks(t, daStartHeight, lastDABlockNewLeader, jwtSecret, testEndpoints.GetDAAddress(), genesisHeight, state.LastBlockHeight)
234233

235234
// Cleanup processes
236235
clusterNodes.killAll()
@@ -260,13 +259,13 @@ func verifyNoDoubleSigning(t *testing.T, clusterNodes *raftClusterNodes, genesis
260259
require.NoError(t, err)
261260
t.Logf("%s: %v", nodes[0], rsp.Block)
262261
}
263-
//t.FailNow()
262+
t.FailNow()
264263
}
265264
}
266265
}
267266

268-
// verifyBlocks checks that DA block heights form a continuous sequence without gaps
269-
func verifyBlocks(t *testing.T, daStartHeight, lastDABlock uint64, jwtSecret string, daAddress string, genesisHeight, lastEVBlock uint64) {
267+
// verifyDABlocks checks that DA block heights form a continuous sequence without gaps
268+
func verifyDABlocks(t *testing.T, daStartHeight, lastDABlock uint64, jwtSecret string, daAddress string, genesisHeight, lastEVBlock uint64) {
270269
t.Helper()
271270
rpcClient, err := jsonrpc.NewClient(t.Context(), zerolog.Nop(), daAddress, jwtSecret, defaultMaxBlobSize)
272271
require.NoError(t, err)
@@ -287,8 +286,8 @@ func verifyBlocks(t *testing.T, daStartHeight, lastDABlock uint64, jwtSecret str
287286
require.NoError(t, err)
288287

289288
for _, blob := range blobs {
290-
if evHeight, hash := extractBlockHeight(t, blob); evHeight != 0 {
291-
t.Logf("extracting block height from blob (da height %d): %x", daHeight, evHeight)
289+
if evHeight, hash, blobType := extractBlockHeight(t, blob); evHeight != 0 {
290+
t.Logf("extracting block height from blob (da height %d): %4d (%s)", daHeight, evHeight, blobType)
292291
if height, ok := deduplicationCache[hash.String()]; ok {
293292
require.Equal(t, evHeight, height)
294293
continue
@@ -309,16 +308,21 @@ func verifyBlocks(t *testing.T, daStartHeight, lastDABlock uint64, jwtSecret str
309308
}
310309

311310
// extractBlockHeight attempts to decode a blob as SignedHeader or SignedData and extract the block height
312-
func extractBlockHeight(t *testing.T, blob []byte) (uint64, types.Hash) {
311+
func extractBlockHeight(t *testing.T, blob []byte) (uint64, types.Hash, string) {
313312
t.Helper()
313+
if len(blob) == 0 {
314+
t.Log("empty blob, skipping")
315+
return 0, nil, ""
316+
}
314317
var headerPb pb.SignedHeader
315318
if err := proto.Unmarshal(blob, &headerPb); err == nil {
316319
var signedHeader types.SignedHeader
317320
if err := signedHeader.FromProto(&headerPb); err == nil {
318321
if err := signedHeader.Header.ValidateBasic(); err == nil {
319-
return signedHeader.Height(), signedHeader.Hash()
322+
return signedHeader.Height(), signedHeader.Hash(), "header"
320323
} else {
321-
t.Logf("invalid header: %v", err)
324+
jsonBZ, _ := json.MarshalIndent(signedHeader.Header, "", " ")
325+
t.Logf("invalid header: %v: %s", err, string(jsonBZ))
322326
}
323327
} else {
324328
t.Logf("failed to unmarshal signed header: %v", err)
@@ -330,12 +334,12 @@ func extractBlockHeight(t *testing.T, blob []byte) (uint64, types.Hash) {
330334
var signedData types.SignedData
331335
if err := signedData.UnmarshalBinary(blob); err == nil {
332336
if signedData.Metadata != nil {
333-
return signedData.Height(), signedData.Hash()
337+
return signedData.Height(), signedData.Hash(), "data"
334338
}
335339
} else {
336340
t.Logf("failed to unmarshal signed data: %v", err)
337341
}
338-
return 0, nil
342+
return 0, nil, ""
339343
}
340344

341345
func initChain(t *testing.T, sut *SystemUnderTest, workDir string) string {
@@ -399,7 +403,7 @@ func setupRaftSequencerNode(
399403
"--evnode.raft.node_id="+nodeID,
400404
"--evnode.raft.raft_addr="+raftAddr,
401405
"--evnode.raft.raft_dir="+raftDir,
402-
"--evnode.raft.bootstrap="+strconv.FormatBool(bootstrap),
406+
"--evnode.raft.bootstrap=true",
403407
"--evnode.raft.peers="+strings.Join(raftPeers, ","),
404408
"--evnode.raft.snap_count=10",
405409
"--evnode.raft.send_timeout=300ms",

0 commit comments

Comments
 (0)