Skip to content

Commit 1cf1167

Browse files
committed
Merge branch 'main' into alex/adr023_ha-failover_raft11
* main: fix: inconsistent state detection and rollback (#2983) chore: improve graceful shutdown restarts (#2985) feat(submitting): add posting strategies (#2973)
2 parents 3203ce6 + 73297c1 commit 1cf1167

32 files changed

+1009
-240
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1717
- Added `post-tx` command and force inclusion server to submit transaction directly to the DA layer. ([#2888](https://github.com/evstack/ev-node/pull/2888))
1818
Additionally, modified the core package to support marking transactions as forced included transactions.
1919
The execution client ought to perform basic validation on those transactions as they have skipped the execution client's mempool.
20+
- Add batching stategies (default stay time-based, unchanged with previous betas). Currently available strategies are `time`, `size`, `immediate` and `adaptive`.
2021

2122
### Changed
2223

apps/evm/cmd/rollback.go

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
package cmd
22

33
import (
4+
"bytes"
45
"context"
56
"errors"
67
"fmt"
8+
"os"
79

10+
"github.com/ethereum/go-ethereum/common"
11+
ds "github.com/ipfs/go-datastore"
812
"github.com/spf13/cobra"
913

1014
goheaderstore "github.com/celestiaorg/go-header/store"
15+
"github.com/evstack/ev-node/execution/evm"
1116
rollcmd "github.com/evstack/ev-node/pkg/cmd"
1217
"github.com/evstack/ev-node/pkg/store"
1318
"github.com/evstack/ev-node/types"
@@ -42,7 +47,7 @@ func NewRollbackCmd() *cobra.Command {
4247

4348
defer func() {
4449
if closeErr := rawEvolveDB.Close(); closeErr != nil {
45-
fmt.Printf("Warning: failed to close evolve database: %v\n", closeErr)
50+
cmd.Printf("Warning: failed to close evolve database: %v\n", closeErr)
4651
}
4752
}()
4853

@@ -63,6 +68,17 @@ func NewRollbackCmd() *cobra.Command {
6368
return fmt.Errorf("failed to rollback ev-node state: %w", err)
6469
}
6570

71+
// rollback execution layer via EngineClient
72+
engineClient, err := createRollbackEngineClient(cmd, rawEvolveDB)
73+
if err != nil {
74+
cmd.Printf("Warning: failed to create engine client, skipping EL rollback: %v\n", err)
75+
} else {
76+
if err := engineClient.Rollback(goCtx, height); err != nil {
77+
return fmt.Errorf("failed to rollback execution layer: %w", err)
78+
}
79+
cmd.Printf("Rolled back execution layer to height %d\n", height)
80+
}
81+
6682
// rollback ev-node goheader state
6783
headerStore, err := goheaderstore.NewStore[*types.SignedHeader](
6884
evolveDB,
@@ -101,7 +117,7 @@ func NewRollbackCmd() *cobra.Command {
101117
errs = errors.Join(errs, fmt.Errorf("failed to rollback data sync service state: %w", err))
102118
}
103119

104-
fmt.Printf("Rolled back ev-node state to height %d\n", height)
120+
cmd.Printf("Rolled back ev-node state to height %d\n", height)
105121
if syncNode {
106122
fmt.Println("Restart the node with the `--evnode.clear_cache` flag")
107123
}
@@ -113,5 +129,42 @@ func NewRollbackCmd() *cobra.Command {
113129
cmd.Flags().Uint64Var(&height, "height", 0, "rollback to a specific height")
114130
cmd.Flags().BoolVar(&syncNode, "sync-node", false, "sync node (no aggregator)")
115131

132+
// EVM flags for execution layer rollback
133+
cmd.Flags().String(evm.FlagEvmEthURL, "http://localhost:8545", "URL of the Ethereum JSON-RPC endpoint")
134+
cmd.Flags().String(evm.FlagEvmEngineURL, "http://localhost:8551", "URL of the Engine API endpoint")
135+
cmd.Flags().String(evm.FlagEvmJWTSecretFile, "", "Path to file containing the JWT secret for authentication")
136+
116137
return cmd
117138
}
139+
140+
func createRollbackEngineClient(cmd *cobra.Command, db ds.Batching) (*evm.EngineClient, error) {
141+
ethURL, err := cmd.Flags().GetString(evm.FlagEvmEthURL)
142+
if err != nil {
143+
return nil, fmt.Errorf("failed to get '%s' flag: %w", evm.FlagEvmEthURL, err)
144+
}
145+
engineURL, err := cmd.Flags().GetString(evm.FlagEvmEngineURL)
146+
if err != nil {
147+
return nil, fmt.Errorf("failed to get '%s' flag: %w", evm.FlagEvmEngineURL, err)
148+
}
149+
150+
jwtSecretFile, err := cmd.Flags().GetString(evm.FlagEvmJWTSecretFile)
151+
if err != nil {
152+
return nil, fmt.Errorf("failed to get '%s' flag: %w", evm.FlagEvmJWTSecretFile, err)
153+
}
154+
155+
if jwtSecretFile == "" {
156+
return nil, fmt.Errorf("JWT secret file must be provided via --evm.jwt-secret-file for EL rollback")
157+
}
158+
159+
secretBytes, err := os.ReadFile(jwtSecretFile)
160+
if err != nil {
161+
return nil, fmt.Errorf("failed to read JWT secret from file '%s': %w", jwtSecretFile, err)
162+
}
163+
jwtSecret := string(bytes.TrimSpace(secretBytes))
164+
165+
if jwtSecret == "" {
166+
return nil, fmt.Errorf("JWT secret file '%s' is empty", jwtSecretFile)
167+
}
168+
169+
return evm.NewEngineExecutionClient(ethURL, engineURL, jwtSecret, common.Hash{}, common.Address{}, db, false)
170+
}

block/internal/cache/bench_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func BenchmarkManager_GetPendingHeaders(b *testing.B) {
7272
b.ReportAllocs()
7373
b.ResetTimer()
7474
for b.Loop() {
75-
hs, err := m.GetPendingHeaders(ctx)
75+
hs, _, err := m.GetPendingHeaders(ctx)
7676
if err != nil {
7777
b.Fatal(err)
7878
}
@@ -93,7 +93,7 @@ func BenchmarkManager_GetPendingData(b *testing.B) {
9393
b.ReportAllocs()
9494
b.ResetTimer()
9595
for b.Loop() {
96-
ds, err := m.GetPendingData(ctx)
96+
ds, _, err := m.GetPendingData(ctx)
9797
if err != nil {
9898
b.Fatal(err)
9999
}

block/internal/cache/manager.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ type CacheManager interface {
7878

7979
// PendingManager provides operations for managing pending headers and data
8080
type PendingManager interface {
81-
GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error)
82-
GetPendingData(ctx context.Context) ([]*types.SignedData, error)
81+
GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, [][]byte, error)
82+
GetPendingData(ctx context.Context) ([]*types.SignedData, [][]byte, error)
8383
SetLastSubmittedHeaderHeight(ctx context.Context, height uint64)
8484
GetLastSubmittedHeaderHeight() uint64
8585
SetLastSubmittedDataHeight(ctx context.Context, height uint64)
@@ -320,20 +320,21 @@ func (m *implementation) DeleteHeight(blockHeight uint64) {
320320
}
321321

322322
// Pending operations
323-
func (m *implementation) GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error) {
323+
func (m *implementation) GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, [][]byte, error) {
324324
return m.pendingHeaders.GetPendingHeaders(ctx)
325325
}
326326

327-
func (m *implementation) GetPendingData(ctx context.Context) ([]*types.SignedData, error) {
328-
// Get pending raw data
329-
dataList, err := m.pendingData.GetPendingData(ctx)
327+
func (m *implementation) GetPendingData(ctx context.Context) ([]*types.SignedData, [][]byte, error) {
328+
// Get pending raw data with marshalled bytes
329+
dataList, marshalledData, err := m.pendingData.GetPendingData(ctx)
330330
if err != nil {
331-
return nil, err
331+
return nil, nil, err
332332
}
333333

334334
// Convert to SignedData (this logic was in manager.go)
335335
signedDataList := make([]*types.SignedData, 0, len(dataList))
336-
for _, data := range dataList {
336+
marshalledSignedData := make([][]byte, 0, len(dataList))
337+
for i, data := range dataList {
337338
if len(data.Txs) == 0 {
338339
continue // Skip empty data
339340
}
@@ -344,9 +345,10 @@ func (m *implementation) GetPendingData(ctx context.Context) ([]*types.SignedDat
344345
Data: *data,
345346
// Signature and Signer will be set by executing component
346347
})
348+
marshalledSignedData = append(marshalledSignedData, marshalledData[i])
347349
}
348350

349-
return signedDataList, nil
351+
return signedDataList, marshalledSignedData, nil
350352
}
351353

352354
func (m *implementation) GetLastSubmittedHeaderHeight() uint64 {

block/internal/cache/manager_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,14 +183,14 @@ func TestPendingHeadersAndData_Flow(t *testing.T) {
183183
require.NoError(t, err)
184184

185185
// headers: all 3 should be pending initially
186-
headers, err := cm.GetPendingHeaders(ctx)
186+
headers, _, err := cm.GetPendingHeaders(ctx)
187187
require.NoError(t, err)
188188
require.Len(t, headers, 3)
189189
assert.Equal(t, uint64(1), headers[0].Height())
190190
assert.Equal(t, uint64(3), headers[2].Height())
191191

192192
// data: empty one filtered, so 2 and 3 only
193-
signedData, err := cm.GetPendingData(ctx)
193+
signedData, _, err := cm.GetPendingData(ctx)
194194
require.NoError(t, err)
195195
require.Len(t, signedData, 2)
196196
assert.Equal(t, uint64(2), signedData[0].Height())
@@ -200,12 +200,12 @@ func TestPendingHeadersAndData_Flow(t *testing.T) {
200200
cm.SetLastSubmittedHeaderHeight(ctx, 1)
201201
cm.SetLastSubmittedDataHeight(ctx, 2)
202202

203-
headers, err = cm.GetPendingHeaders(ctx)
203+
headers, _, err = cm.GetPendingHeaders(ctx)
204204
require.NoError(t, err)
205205
require.Len(t, headers, 2)
206206
assert.Equal(t, uint64(2), headers[0].Height())
207207

208-
signedData, err = cm.GetPendingData(ctx)
208+
signedData, _, err = cm.GetPendingData(ctx)
209209
require.NoError(t, err)
210210
require.Len(t, signedData, 1)
211211
assert.Equal(t, uint64(3), signedData[0].Height())

block/internal/cache/pending_base.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/binary"
66
"errors"
77
"fmt"
8+
"sync"
89
"sync/atomic"
910

1011
ds "github.com/ipfs/go-datastore"
@@ -22,6 +23,9 @@ type pendingBase[T any] struct {
2223
metaKey string
2324
fetch func(ctx context.Context, store store.Store, height uint64) (T, error)
2425
lastHeight atomic.Uint64
26+
27+
// Marshalling cache to avoid redundant marshalling
28+
marshalledCache sync.Map // key: uint64 (height), value: []byte
2529
}
2630

2731
// newPendingBase constructs a new pendingBase for a given type.
@@ -84,6 +88,9 @@ func (pb *pendingBase[T]) setLastSubmittedHeight(ctx context.Context, newLastSub
8488
if err != nil {
8589
pb.logger.Error().Err(err).Msg("failed to store height of latest item submitted to DA")
8690
}
91+
92+
// Clear marshalled cache for submitted heights
93+
pb.clearMarshalledCacheUpTo(newLastSubmittedHeight)
8794
}
8895
}
8996

@@ -105,3 +112,26 @@ func (pb *pendingBase[T]) init() error {
105112
pb.lastHeight.CompareAndSwap(0, lsh)
106113
return nil
107114
}
115+
116+
// getMarshalledForHeight returns cached marshalled bytes for a height, or nil if not cached
117+
func (pb *pendingBase[T]) getMarshalledForHeight(height uint64) []byte {
118+
if val, ok := pb.marshalledCache.Load(height); ok {
119+
return val.([]byte)
120+
}
121+
return nil
122+
}
123+
124+
// setMarshalledForHeight caches marshalled bytes for a height
125+
func (pb *pendingBase[T]) setMarshalledForHeight(height uint64, marshalled []byte) {
126+
pb.marshalledCache.Store(height, marshalled)
127+
}
128+
129+
// clearMarshalledCacheUpTo removes cached marshalled bytes up to and including the given height
130+
func (pb *pendingBase[T]) clearMarshalledCacheUpTo(height uint64) {
131+
pb.marshalledCache.Range(func(key, _ any) bool {
132+
if h := key.(uint64); h <= height {
133+
pb.marshalledCache.Delete(h)
134+
}
135+
return true
136+
})
137+
}

block/internal/cache/pending_base_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func TestPendingBase_ErrorConditions(t *testing.T) {
3535
// ensure store height stays lower (0)
3636
ph, err := NewPendingHeaders(st, logger)
3737
require.NoError(t, err)
38-
pending, err := ph.GetPendingHeaders(ctx)
38+
pending, _, err := ph.GetPendingHeaders(ctx)
3939
assert.Error(t, err)
4040
assert.Len(t, pending, 0)
4141

block/internal/cache/pending_data.go

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package cache
33
import (
44
"context"
55
"errors"
6+
"fmt"
67

78
"github.com/rs/zerolog"
89

@@ -56,9 +57,42 @@ func (pd *PendingData) init() error {
5657
return pd.base.init()
5758
}
5859

59-
// GetPendingData returns a sorted slice of pending Data.
60-
func (pd *PendingData) GetPendingData(ctx context.Context) ([]*types.Data, error) {
61-
return pd.base.getPending(ctx)
60+
// GetPendingData returns a sorted slice of pending Data along with their marshalled bytes.
61+
// It uses an internal cache to avoid re-marshalling data on subsequent calls.
62+
func (pd *PendingData) GetPendingData(ctx context.Context) ([]*types.Data, [][]byte, error) {
63+
dataList, err := pd.base.getPending(ctx)
64+
if err != nil {
65+
return nil, nil, err
66+
}
67+
68+
if len(dataList) == 0 {
69+
return nil, nil, nil
70+
}
71+
72+
marshalled := make([][]byte, len(dataList))
73+
lastSubmitted := pd.base.lastHeight.Load()
74+
75+
for i, data := range dataList {
76+
height := lastSubmitted + uint64(i) + 1
77+
78+
// Try to get from cache first
79+
if cached := pd.base.getMarshalledForHeight(height); cached != nil {
80+
marshalled[i] = cached
81+
continue
82+
}
83+
84+
// Marshal if not in cache
85+
dataBytes, err := data.MarshalBinary()
86+
if err != nil {
87+
return nil, nil, fmt.Errorf("failed to marshal data at height %d: %w", height, err)
88+
}
89+
marshalled[i] = dataBytes
90+
91+
// Store in cache
92+
pd.base.setMarshalledForHeight(height, dataBytes)
93+
}
94+
95+
return dataList, marshalled, nil
6296
}
6397

6498
func (pd *PendingData) NumPendingData() uint64 {

block/internal/cache/pending_data_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func TestPendingData_BasicFlow(t *testing.T) {
3939

4040
// initially all 3 data items are pending, incl. empty
4141
require.Equal(t, uint64(3), pendingData.NumPendingData())
42-
pendingDataList, err := pendingData.GetPendingData(ctx)
42+
pendingDataList, _, err := pendingData.GetPendingData(ctx)
4343
require.NoError(t, err)
4444
require.Len(t, pendingDataList, 3)
4545
require.Equal(t, uint64(1), pendingDataList[0].Height())
@@ -53,7 +53,7 @@ func TestPendingData_BasicFlow(t *testing.T) {
5353
require.Equal(t, uint64(1), binary.LittleEndian.Uint64(metadataRaw))
5454

5555
require.Equal(t, uint64(2), pendingData.NumPendingData())
56-
pendingDataList, err = pendingData.GetPendingData(ctx)
56+
pendingDataList, _, err = pendingData.GetPendingData(ctx)
5757
require.NoError(t, err)
5858
require.Len(t, pendingDataList, 2)
5959
require.Equal(t, uint64(2), pendingDataList[0].Height())
@@ -97,7 +97,7 @@ func TestPendingData_GetPending_PropagatesFetchError(t *testing.T) {
9797
require.NoError(t, err)
9898

9999
// fetching pending should propagate the not-found error from store
100-
pending, err := pendingData.GetPendingData(ctx)
100+
pending, _, err := pendingData.GetPendingData(ctx)
101101
require.Error(t, err)
102102
require.Empty(t, pending)
103103
}

0 commit comments

Comments
 (0)