diff --git a/eth/stagedsync/bor_heimdall_shared.go b/eth/stagedsync/bor_heimdall_shared.go index 2569e8228a1..65bad0995ab 100644 --- a/eth/stagedsync/bor_heimdall_shared.go +++ b/eth/stagedsync/bor_heimdall_shared.go @@ -378,18 +378,20 @@ func fetchRequiredHeimdallStateSyncEventsIfNeeded( logPrefix string, logger log.Logger, lastStateSyncEventID uint64, -) (uint64, int, time.Duration, error) { + skipCount int, +) (uint64, int, int, time.Duration, error) { headerNum := header.Number.Uint64() if headerNum%borConfig.CalculateSprintLength(headerNum) != 0 || headerNum == 0 { // we fetch events only at beginning of each sprint - return lastStateSyncEventID, 0, 0, nil + return lastStateSyncEventID, 0, skipCount, 0, nil } return fetchAndWriteHeimdallStateSyncEvents( ctx, header, lastStateSyncEventID, + skipCount, tx, borConfig, blockReader, @@ -404,13 +406,14 @@ func fetchAndWriteHeimdallStateSyncEvents( ctx context.Context, header *types.Header, lastStateSyncEventID uint64, + skipCount int, tx kv.RwTx, config *borcfg.BorConfig, blockReader services.FullBlockReader, heimdallClient heimdall.HeimdallClient, chainID string, logPrefix string, - logger log.Logger) (uint64, int, time.Duration, error) { + logger log.Logger) (uint64, int, int, time.Duration, error) { fetchStart := time.Now() // Find out the latest eventId var fromId uint64 @@ -419,13 +422,13 @@ func fetchAndWriteHeimdallStateSyncEvents( if blockNum%config.CalculateSprintLength(blockNum) != 0 || blockNum == 0 { // we fetch events only at beginning of each sprint - return lastStateSyncEventID, 0, 0, nil + return lastStateSyncEventID, 0, skipCount, 0, nil } from, to, err := bor.CalculateEventWindow(ctx, config, header, tx, blockReader) if err != nil { - return lastStateSyncEventID, 0, time.Since(fetchStart), err + return lastStateSyncEventID, 0, skipCount, time.Since(fetchStart), err } fetchTo := to @@ -457,11 +460,14 @@ func fetchAndWriteHeimdallStateSyncEvents( eventRecords, err := heimdallClient.FetchStateSyncEvents(ctx, fromId, fetchTo, fetchLimit) if err != nil { - return lastStateSyncEventID, 0, time.Since(fetchStart), err + return lastStateSyncEventID, 0, skipCount, time.Since(fetchStart), err } + var overrideCount int + if config.OverrideStateSyncRecords != nil { if val, ok := config.OverrideStateSyncRecords[strconv.FormatUint(blockNum, 10)]; ok { + overrideCount = len(eventRecords) - val eventRecords = eventRecords[0:val] } } @@ -491,10 +497,12 @@ func fetchAndWriteHeimdallStateSyncEvents( // don't apply this for devnets we may have looser state event constraints // (TODO these probably needs fixing) - if !(chainID == "1337") { + if skipCount > 0 { + skipCount-- + } else if !(chainID == "1337") { if lastStateSyncEventID+1 != eventRecord.ID || eventRecord.ChainID != chainID || !(afterCheck(from, eventRecord.Time, initialRecordTime) && eventRecord.Time.Before(to)) { - return lastStateSyncEventID, i, time.Since(fetchStart), fmt.Errorf( + return lastStateSyncEventID, i, overrideCount, time.Since(fetchStart), fmt.Errorf( "invalid event record received %s, %s, %s, %s", fmt.Sprintf("blockNum=%d", blockNum), fmt.Sprintf("eventId=%d (exp %d)", eventRecord.ID, lastStateSyncEventID+1), @@ -507,12 +515,12 @@ func fetchAndWriteHeimdallStateSyncEvents( data, err := eventRecord.MarshallBytes() if err != nil { logger.Error(fmt.Sprintf("[%s] Unable to pack txn for commitState", logPrefix), "err", err) - return lastStateSyncEventID, i, time.Since(fetchStart), err + return lastStateSyncEventID, i, skipCount + overrideCount, time.Since(fetchStart), err } eventIdBytes := eventRecord.MarshallIdBytes() if err = tx.Put(kv.BorEvents, eventIdBytes, data); err != nil { - return lastStateSyncEventID, i, time.Since(fetchStart), err + return lastStateSyncEventID, i, skipCount + overrideCount, time.Since(fetchStart), err } if initialRecordTime == nil { @@ -524,6 +532,8 @@ func fetchAndWriteHeimdallStateSyncEvents( lastEventRecord = eventRecord } + skipCount += overrideCount + if lastEventRecord != nil { logger.Debug("putting state sync events", "blockNum", blockNum, "lastID", lastEventRecord.ID) @@ -531,9 +541,9 @@ func fetchAndWriteHeimdallStateSyncEvents( binary.BigEndian.PutUint64(blockNumBuf[:], blockNum) eventIdBytes := lastEventRecord.MarshallIdBytes() if err = tx.Put(kv.BorEventNums, blockNumBuf[:], eventIdBytes); err != nil { - return lastStateSyncEventID, len(eventRecords), time.Since(fetchStart), err + return lastStateSyncEventID, len(eventRecords), skipCount, time.Since(fetchStart), err } } - return lastStateSyncEventID, len(eventRecords), time.Since(fetchStart), nil + return lastStateSyncEventID, len(eventRecords), skipCount, time.Since(fetchStart), nil } diff --git a/eth/stagedsync/stage_bor_heimdall.go b/eth/stagedsync/stage_bor_heimdall.go index 11ba6d2423b..6eeae3c40eb 100644 --- a/eth/stagedsync/stage_bor_heimdall.go +++ b/eth/stagedsync/stage_bor_heimdall.go @@ -231,6 +231,15 @@ func BorHeimdallForward( var nextEventRecord *heimdall.EventRecordWithTime + // sometimes via config eveents are skipped from particular blocks and + // pushed into the next one, when this happens we need to skip validation + // as the times won't match the expected window. In practice it only affects + // these blocks: 14949120,14949184, 14953472, 14953536, 14953600, 14953664, + // 14953728, 14953792, 14953856 so it seems keeping a local skip marker is good + // enough - it will only impact sync from origin operations. If + // this becomes more prevalent this will need to be re-thought + var skipCount int + for blockNum = lastBlockNum + 1; blockNum <= headNumber; blockNum++ { select { default: @@ -287,6 +296,7 @@ func BorHeimdallForward( if cfg.blockReader.BorSnapshots().SegmentsMin() == 0 { snapTime = snapTime + time.Since(snapStart) // SegmentsMin is only set if running as an uploader process (check SnapshotsCfg.snapshotUploader and + // SegmentsMin is only set if running as an uploader process (check SnapshotsCfg.snapshotUploader and // UploadLocationFlag) when we remove snapshots based on FrozenBlockLimit and number of uploaded snapshots // avoid calling this if block for blockNums <= SegmentsMin to avoid reinsertion of snapshots snap := loadSnapshot(blockNum, header.Hash(), cfg.borConfig, recents, signatures, cfg.snapDb, logger) @@ -362,7 +372,7 @@ func BorHeimdallForward( var records int if lastStateSyncEventID == 0 || lastStateSyncEventID != endStateSyncEventId { - lastStateSyncEventID, records, callTime, err = fetchRequiredHeimdallStateSyncEventsIfNeeded( + lastStateSyncEventID, records, skipCount, callTime, err = fetchRequiredHeimdallStateSyncEventsIfNeeded( ctx, header, tx, @@ -373,6 +383,7 @@ func BorHeimdallForward( s.LogPrefix(), logger, lastStateSyncEventID, + skipCount, ) if err != nil { diff --git a/eth/stagedsync/stage_mining_bor_heimdall.go b/eth/stagedsync/stage_mining_bor_heimdall.go index 3561dce54cb..ece6865cb45 100644 --- a/eth/stagedsync/stage_mining_bor_heimdall.go +++ b/eth/stagedsync/stage_mining_bor_heimdall.go @@ -81,7 +81,7 @@ func MiningBorHeimdallForward( return err } - lastStateSyncEventID, records, fetchTime, err := fetchRequiredHeimdallStateSyncEventsIfNeeded( + lastStateSyncEventID, records, _, fetchTime, err := fetchRequiredHeimdallStateSyncEventsIfNeeded( ctx, header, tx, @@ -92,6 +92,7 @@ func MiningBorHeimdallForward( logPrefix, logger, lastStateSyncEventID, + 0, ) if err != nil {