diff --git a/rollup/rollup_sync_service/rollup_sync_service.go b/rollup/rollup_sync_service/rollup_sync_service.go index bfc8bc08ef58..f90aa2695e63 100644 --- a/rollup/rollup_sync_service/rollup_sync_service.go +++ b/rollup/rollup_sync_service/rollup_sync_service.go @@ -178,8 +178,11 @@ func (s *RollupSyncService) fetchRollupEvents() { log.Trace("Sync service fetch rollup events", "latest processed block", s.latestProcessedBlock.Load(), "latest confirmed", latestConfirmed) + latestProcessedBlock := s.latestProcessedBlock.Load() + updatedLatestProcessedBlock := latestProcessedBlock + // query in batches - for from := s.latestProcessedBlock.Load() + 1; from <= latestConfirmed; from += defaultFetchBlockRange { + for from := latestProcessedBlock + 1; from <= latestConfirmed; from += defaultFetchBlockRange { if s.ctx.Err() != nil { log.Info("Context canceled", "reason", s.ctx.Err()) return @@ -201,8 +204,10 @@ func (s *RollupSyncService) fetchRollupEvents() { return } - s.latestProcessedBlock.Store(to) + updatedLatestProcessedBlock = to } + + s.latestProcessedBlock.CompareAndSwap(latestProcessedBlock, updatedLatestProcessedBlock) } func (s *RollupSyncService) parseAndUpdateRollupEventLogs(logs []types.Log, endBlockNumber uint64) error { diff --git a/rollup/sync_service/sync_service.go b/rollup/sync_service/sync_service.go index f32979671211..9fc50d68a9c7 100644 --- a/rollup/sync_service/sync_service.go +++ b/rollup/sync_service/sync_service.go @@ -164,7 +164,10 @@ func (s *SyncService) fetchMessages() { return } - log.Trace("Sync service fetchMessages", "latestProcessedBlock", s.latestProcessedBlock.Load(), "latestConfirmed", latestConfirmed) + latestProcessedBlock := s.latestProcessedBlock.Load() + updatedLatestProcessedBlock := latestProcessedBlock + + log.Trace("Sync service fetchMessages", "latestProcessedBlock", latestProcessedBlock, "latestConfirmed", latestConfirmed) // keep track of next queue index we're expecting to see queueIndex := rawdb.ReadHighestSyncedQueueIndex(s.db) @@ -194,7 +197,7 @@ func (s *SyncService) fetchMessages() { numMessagesPendingDbWrite = 0 } - s.latestProcessedBlock.Store(lastBlock) + updatedLatestProcessedBlock = lastBlock } // ticker for logging progress @@ -202,7 +205,7 @@ func (s *SyncService) fetchMessages() { numMsgsCollected := 0 // query in batches - for from := s.latestProcessedBlock.Load() + 1; from <= latestConfirmed; from += DefaultFetchBlockRange { + for from := updatedLatestProcessedBlock + 1; from <= latestConfirmed; from += DefaultFetchBlockRange { select { case <-s.ctx.Done(): // flush pending writes to database @@ -211,8 +214,8 @@ func (s *SyncService) fetchMessages() { } return case <-t.C: - progress := 100 * float64(s.latestProcessedBlock.Load()) / float64(latestConfirmed) - log.Info("Syncing L1 messages", "processed", s.latestProcessedBlock.Load(), "confirmed", latestConfirmed, "collected", numMsgsCollected, "progress(%)", progress) + progress := 100 * float64(updatedLatestProcessedBlock) / float64(latestConfirmed) + log.Info("Syncing L1 messages", "processed", updatedLatestProcessedBlock, "confirmed", latestConfirmed, "collected", numMsgsCollected, "progress(%)", progress) default: } @@ -256,4 +259,6 @@ func (s *SyncService) fetchMessages() { flush(to) } } + + s.latestProcessedBlock.CompareAndSwap(latestProcessedBlock, updatedLatestProcessedBlock) }