Skip to content

Commit d7eda60

Browse files
authored
refactor(syncer,cache): use compare and swap loop and add comments (#2873)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. NOTE: PR titles should follow semantic commits: https://www.conventionalcommits.org/en/v1.0.0/ --> ## Overview Small cleanups, adding more comments to describe cache behavior. Make sure CompareAndSwap succeeds for da height setting in cache. <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. Ex: Closes #<issue number> -->
1 parent 9a5eba1 commit d7eda60

File tree

6 files changed

+57
-98
lines changed

6 files changed

+57
-98
lines changed

apps/evm/single/cmd/rollback.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func NewRollbackCmd() *cobra.Command {
109109

110110
fmt.Printf("Rolled back ev-node state to height %d\n", height)
111111
if syncNode {
112-
fmt.Println("Restart the node with the `--clear-cache` flag")
112+
fmt.Println("Restart the node with the `--evnode.clear_cache` flag")
113113
}
114114

115115
return errs

apps/testapp/cmd/rollback.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func NewRollbackCmd() *cobra.Command {
120120

121121
fmt.Printf("Rolled back ev-node state to height %d\n", height)
122122
if syncNode {
123-
fmt.Println("Restart the node with the `--clear-cache` flag")
123+
fmt.Println("Restart the node with the `--evnode.clear_cache` flag")
124124
}
125125

126126
return errs

block/internal/cache/generic_cache.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,14 @@ func (c *Cache[T]) setDAIncluded(hash string, daHeight uint64, blockHeight uint6
9797
c.hashByHeight.Store(blockHeight, hash)
9898

9999
// Update max DA height if necessary
100-
current := c.maxDAHeight.Load()
101-
if daHeight >= current {
102-
_ = c.maxDAHeight.CompareAndSwap(current, daHeight)
100+
for {
101+
current := c.maxDAHeight.Load()
102+
if daHeight <= current {
103+
return
104+
}
105+
if c.maxDAHeight.CompareAndSwap(current, daHeight) {
106+
return
107+
}
103108
}
104109
}
105110

@@ -250,7 +255,7 @@ func (c *Cache[T]) LoadFromDisk(folderPath string) error {
250255
// Update max DA height during load
251256
current := c.maxDAHeight.Load()
252257
if v > current {
253-
_ = c.maxDAHeight.CompareAndSwap(current, v)
258+
c.maxDAHeight.Store(v)
254259
}
255260
}
256261

block/internal/syncing/syncer.go

Lines changed: 35 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ type Syncer struct {
5252
// State management
5353
lastState *atomic.Pointer[types.State]
5454

55-
// DA state
56-
daHeight *atomic.Uint64
55+
// DA retriever height
56+
daRetrieverHeight *atomic.Uint64
5757

5858
// P2P stores
5959
headerStore common.Broadcaster[*types.SignedHeader]
@@ -95,29 +95,28 @@ func NewSyncer(
9595
errorCh chan<- error,
9696
) *Syncer {
9797
return &Syncer{
98-
store: store,
99-
exec: exec,
100-
da: da,
101-
cache: cache,
102-
metrics: metrics,
103-
config: config,
104-
genesis: genesis,
105-
options: options,
106-
headerStore: headerStore,
107-
dataStore: dataStore,
108-
lastState: &atomic.Pointer[types.State]{},
109-
daHeight: &atomic.Uint64{},
110-
heightInCh: make(chan common.DAHeightEvent, 1_000),
111-
errorCh: errorCh,
112-
logger: logger.With().Str("component", "syncer").Logger(),
98+
store: store,
99+
exec: exec,
100+
da: da,
101+
cache: cache,
102+
metrics: metrics,
103+
config: config,
104+
genesis: genesis,
105+
options: options,
106+
headerStore: headerStore,
107+
dataStore: dataStore,
108+
lastState: &atomic.Pointer[types.State]{},
109+
daRetrieverHeight: &atomic.Uint64{},
110+
heightInCh: make(chan common.DAHeightEvent, 1_000),
111+
errorCh: errorCh,
112+
logger: logger.With().Str("component", "syncer").Logger(),
113113
}
114114
}
115115

116116
// Start begins the syncing component
117117
func (s *Syncer) Start(ctx context.Context) error {
118118
s.ctx, s.cancel = context.WithCancel(ctx)
119119

120-
// Initialize state
121120
if err := s.initializeState(); err != nil {
122121
return fmt.Errorf("failed to initialize syncer state: %w", err)
123122
}
@@ -131,12 +130,12 @@ func (s *Syncer) Start(ctx context.Context) error {
131130
s.p2pHandler.SetProcessedHeight(currentHeight)
132131
}
133132

133+
if !s.waitForGenesis() {
134+
return nil
135+
}
136+
134137
// Start main processing loop
135-
s.wg.Add(1)
136-
go func() {
137-
defer s.wg.Done()
138-
s.processLoop()
139-
}()
138+
go s.processLoop()
140139

141140
// Start dedicated workers for DA, and pending processing
142141
s.startSyncWorkers()
@@ -175,16 +174,6 @@ func (s *Syncer) SetLastState(state types.State) {
175174
s.lastState.Store(&state)
176175
}
177176

178-
// GetDAHeight returns the current DA height
179-
func (s *Syncer) GetDAHeight() uint64 {
180-
return max(s.daHeight.Load(), s.cache.DaHeight())
181-
}
182-
183-
// SetDAHeight updates the DA height
184-
func (s *Syncer) SetDAHeight(height uint64) {
185-
s.daHeight.Store(height)
186-
}
187-
188177
// initializeState loads the current sync state
189178
func (s *Syncer) initializeState() error {
190179
// Load state from store
@@ -216,14 +205,13 @@ func (s *Syncer) initializeState() error {
216205
}
217206
s.SetLastState(state)
218207

219-
// Set DA height
220-
// we get the max from the genesis da height, the state da height and the cache (fetched) da height
221-
// if a user has messed up and sync da too far ahead, on restart they can clear the cache (--clear-cache) and the retrieve will restart fetching from the last known block synced and executed from DA or the set genesis da height.
222-
s.SetDAHeight(max(s.genesis.DAStartHeight, s.cache.DaHeight(), state.DAHeight))
208+
// Set DA height to the maximum of the genesis start height, the state's DA height, and the cached DA height.
209+
// This ensures we resume from the highest known DA height, even if the cache is cleared on restart. If the DA height is too high because of a user error, reset it with --evnode.clear_cache. The DA height will be back to the last highest known executed DA height for a height.
210+
s.daRetrieverHeight.Store(max(s.genesis.DAStartHeight, s.cache.DaHeight(), state.DAHeight))
223211

224212
s.logger.Info().
225213
Uint64("height", state.LastBlockHeight).
226-
Uint64("da_height", s.GetDAHeight()).
214+
Uint64("da_height", s.daRetrieverHeight.Load()).
227215
Str("chain_id", state.ChainID).
228216
Msg("initialized syncer state")
229217

@@ -238,6 +226,9 @@ func (s *Syncer) initializeState() error {
238226

239227
// processLoop is the main coordination loop for processing events
240228
func (s *Syncer) processLoop() {
229+
s.wg.Add(1)
230+
defer s.wg.Done()
231+
241232
s.logger.Info().Msg("starting process loop")
242233
defer s.logger.Info().Msg("process loop stopped")
243234

@@ -261,10 +252,6 @@ func (s *Syncer) startSyncWorkers() {
261252
func (s *Syncer) daWorkerLoop() {
262253
defer s.wg.Done()
263254

264-
if !s.waitForGenesis() {
265-
return
266-
}
267-
268255
s.logger.Info().Msg("starting DA worker")
269256
defer s.logger.Info().Msg("DA worker stopped")
270257

@@ -299,13 +286,13 @@ func (s *Syncer) fetchDAUntilCaughtUp() error {
299286
default:
300287
}
301288

302-
daHeight := s.GetDAHeight()
289+
daHeight := max(s.daRetrieverHeight.Load(), s.cache.DaHeight())
303290

304291
events, err := s.daRetriever.RetrieveFromDA(s.ctx, daHeight)
305292
if err != nil {
306293
switch {
307294
case errors.Is(err, coreda.ErrBlobNotFound):
308-
s.SetDAHeight(daHeight + 1)
295+
s.daRetrieverHeight.Store(daHeight + 1)
309296
continue // Fetch next height immediately
310297
case errors.Is(err, coreda.ErrHeightFromFuture):
311298
s.logger.Debug().Err(err).Uint64("da_height", daHeight).Msg("DA is ahead of local target; backing off future height requests")
@@ -330,18 +317,14 @@ func (s *Syncer) fetchDAUntilCaughtUp() error {
330317
}
331318
}
332319

333-
// increment DA height on successful retrieval
334-
s.SetDAHeight(daHeight + 1)
320+
// increment DA retrieval height on successful retrieval
321+
s.daRetrieverHeight.Store(daHeight + 1)
335322
}
336323
}
337324

338325
func (s *Syncer) pendingWorkerLoop() {
339326
defer s.wg.Done()
340327

341-
if !s.waitForGenesis() {
342-
return
343-
}
344-
345328
s.logger.Info().Msg("starting pending worker")
346329
defer s.logger.Info().Msg("pending worker stopped")
347330

@@ -361,10 +344,6 @@ func (s *Syncer) pendingWorkerLoop() {
361344
func (s *Syncer) p2pWorkerLoop() {
362345
defer s.wg.Done()
363346

364-
if !s.waitForGenesis() {
365-
return
366-
}
367-
368347
logger := s.logger.With().Str("worker", "p2p").Logger()
369348
logger.Info().Msg("starting P2P worker")
370349
defer logger.Info().Msg("P2P worker stopped")
@@ -545,13 +524,14 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {
545524
return err
546525
}
547526

548-
// Apply block
549527
newState, err := s.applyBlock(header.Header, data, currentState)
550528
if err != nil {
551529
return fmt.Errorf("failed to apply block: %w", err)
552530
}
553531

554532
// Update DA height if needed
533+
// This height is only updated when a height is processed from DA as P2P
534+
// events do not contain DA height information
555535
if event.DaHeight > newState.DAHeight {
556536
newState.DAHeight = event.DaHeight
557537
}
@@ -677,15 +657,6 @@ func (s *Syncer) sendCriticalError(err error) {
677657
}
678658
}
679659

680-
// sendNonBlockingSignal sends a signal without blocking
681-
func (s *Syncer) sendNonBlockingSignal(ch chan struct{}, name string) {
682-
select {
683-
case ch <- struct{}{}:
684-
default:
685-
s.logger.Debug().Str("channel", name).Msg("channel full, signal dropped")
686-
}
687-
}
688-
689660
// processPendingEvents fetches and processes pending events from cache
690661
// optimistically fetches the next events from cache until no matching heights are found
691662
func (s *Syncer) processPendingEvents() {

block/internal/syncing/syncer_benchmark_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func BenchmarkSyncerIO(b *testing.B) {
5858
}
5959
require.Len(b, fixt.s.heightInCh, 0)
6060

61-
assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daHeight)
61+
assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight)
6262
gotStoreHeight, err := fixt.s.store.Height(b.Context())
6363
require.NoError(b, err)
6464
assert.Equal(b, spec.heights, gotStoreHeight)

block/internal/syncing/syncer_test.go

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -274,23 +274,6 @@ func TestSequentialBlockSync(t *testing.T) {
274274
requireEmptyChan(t, errChan)
275275
}
276276

277-
func TestSyncer_sendNonBlockingSignal(t *testing.T) {
278-
s := &Syncer{logger: zerolog.Nop()}
279-
ch := make(chan struct{}, 1)
280-
ch <- struct{}{}
281-
done := make(chan struct{})
282-
go func() {
283-
s.sendNonBlockingSignal(ch, "test")
284-
close(done)
285-
}()
286-
select {
287-
case <-done:
288-
// ok
289-
case <-time.After(200 * time.Millisecond):
290-
t.Fatal("sendNonBlockingSignal blocked unexpectedly")
291-
}
292-
}
293-
294277
func TestSyncer_processPendingEvents(t *testing.T) {
295278
ds := dssync.MutexWrap(datastore.NewMapDatastore())
296279
st := store.New(ds)
@@ -432,7 +415,7 @@ func TestSyncLoopPersistState(t *testing.T) {
432415
requireEmptyChan(t, errorCh)
433416

434417
t.Log("sync workers on instance1 completed")
435-
require.Equal(t, myFutureDAHeight, syncerInst1.GetDAHeight())
418+
require.Equal(t, myFutureDAHeight, syncerInst1.daRetrieverHeight.Load())
436419

437420
// wait for all events consumed
438421
require.NoError(t, cacheMgr.SaveToDisk())
@@ -482,7 +465,7 @@ func TestSyncLoopPersistState(t *testing.T) {
482465
Run(func(arg mock.Arguments) {
483466
cancel()
484467
// retrieve last one again
485-
assert.Equal(t, syncerInst2.GetDAHeight(), arg.Get(1).(uint64))
468+
assert.Equal(t, syncerInst2.daRetrieverHeight.Load(), arg.Get(1).(uint64))
486469
}).
487470
Return(nil, nil)
488471

@@ -623,14 +606,14 @@ func TestSyncer_InitializeState_CallsReplayer(t *testing.T) {
623606

624607
// Create syncer with minimal dependencies
625608
syncer := &Syncer{
626-
store: mockStore,
627-
exec: mockExec,
628-
genesis: gen,
629-
lastState: &atomic.Pointer[types.State]{},
630-
daHeight: &atomic.Uint64{},
631-
logger: zerolog.Nop(),
632-
ctx: context.Background(),
633-
cache: cm,
609+
store: mockStore,
610+
exec: mockExec,
611+
genesis: gen,
612+
lastState: &atomic.Pointer[types.State]{},
613+
daRetrieverHeight: &atomic.Uint64{},
614+
logger: zerolog.Nop(),
615+
ctx: context.Background(),
616+
cache: cm,
634617
}
635618

636619
// Initialize state - this should call Replayer

0 commit comments

Comments
 (0)