Skip to content

Commit

Permalink
Don't validate skipped events (#11726)
Browse files Browse the repository at this point in the history
Sometimes via config events are skipped from particular blocks and
pushed into the next one, when this happens we need to skip validation
as the times won't match the expected window. 

In practice it only affects these blocks: 
  14949120,14949184, 14953472, 14953536, 14953600, 14953664,
  14953728, 14953792, 14953856 

So it seems keeping a local skip marker is good enough - it will only
impact sync from origin operations. If
this becomes more prevalent this will need to be re-thought

---------

Co-authored-by: alex.sharov <AskAlexSharov@gmail.com>
  • Loading branch information
mh0lt and AskAlexSharov authored Aug 24, 2024
1 parent d9b7cf1 commit a341d06
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 14 deletions.
34 changes: 22 additions & 12 deletions eth/stagedsync/bor_heimdall_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,18 +378,20 @@ func fetchRequiredHeimdallStateSyncEventsIfNeeded(
logPrefix string,
logger log.Logger,
lastStateSyncEventID uint64,
) (uint64, int, time.Duration, error) {
skipCount int,
) (uint64, int, int, time.Duration, error) {

headerNum := header.Number.Uint64()
if headerNum%borConfig.CalculateSprintLength(headerNum) != 0 || headerNum == 0 {
// we fetch events only at beginning of each sprint
return lastStateSyncEventID, 0, 0, nil
return lastStateSyncEventID, 0, skipCount, 0, nil
}

return fetchAndWriteHeimdallStateSyncEvents(
ctx,
header,
lastStateSyncEventID,
skipCount,
tx,
borConfig,
blockReader,
Expand All @@ -404,13 +406,14 @@ func fetchAndWriteHeimdallStateSyncEvents(
ctx context.Context,
header *types.Header,
lastStateSyncEventID uint64,
skipCount int,
tx kv.RwTx,
config *borcfg.BorConfig,
blockReader services.FullBlockReader,
heimdallClient heimdall.HeimdallClient,
chainID string,
logPrefix string,
logger log.Logger) (uint64, int, time.Duration, error) {
logger log.Logger) (uint64, int, int, time.Duration, error) {
fetchStart := time.Now()
// Find out the latest eventId
var fromId uint64
Expand All @@ -419,13 +422,13 @@ func fetchAndWriteHeimdallStateSyncEvents(

if blockNum%config.CalculateSprintLength(blockNum) != 0 || blockNum == 0 {
// we fetch events only at beginning of each sprint
return lastStateSyncEventID, 0, 0, nil
return lastStateSyncEventID, 0, skipCount, 0, nil
}

from, to, err := bor.CalculateEventWindow(ctx, config, header, tx, blockReader)

if err != nil {
return lastStateSyncEventID, 0, time.Since(fetchStart), err
return lastStateSyncEventID, 0, skipCount, time.Since(fetchStart), err
}

fetchTo := to
Expand Down Expand Up @@ -457,11 +460,14 @@ func fetchAndWriteHeimdallStateSyncEvents(

eventRecords, err := heimdallClient.FetchStateSyncEvents(ctx, fromId, fetchTo, fetchLimit)
if err != nil {
return lastStateSyncEventID, 0, time.Since(fetchStart), err
return lastStateSyncEventID, 0, skipCount, time.Since(fetchStart), err
}

var overrideCount int

if config.OverrideStateSyncRecords != nil {
if val, ok := config.OverrideStateSyncRecords[strconv.FormatUint(blockNum, 10)]; ok {
overrideCount = len(eventRecords) - val
eventRecords = eventRecords[0:val]
}
}
Expand Down Expand Up @@ -491,10 +497,12 @@ func fetchAndWriteHeimdallStateSyncEvents(

// don't apply this for devnets we may have looser state event constraints
// (TODO these probably needs fixing)
if !(chainID == "1337") {
if skipCount > 0 {
skipCount--
} else if !(chainID == "1337") {
if lastStateSyncEventID+1 != eventRecord.ID || eventRecord.ChainID != chainID ||
!(afterCheck(from, eventRecord.Time, initialRecordTime) && eventRecord.Time.Before(to)) {
return lastStateSyncEventID, i, time.Since(fetchStart), fmt.Errorf(
return lastStateSyncEventID, i, overrideCount, time.Since(fetchStart), fmt.Errorf(
"invalid event record received %s, %s, %s, %s",
fmt.Sprintf("blockNum=%d", blockNum),
fmt.Sprintf("eventId=%d (exp %d)", eventRecord.ID, lastStateSyncEventID+1),
Expand All @@ -507,12 +515,12 @@ func fetchAndWriteHeimdallStateSyncEvents(
data, err := eventRecord.MarshallBytes()
if err != nil {
logger.Error(fmt.Sprintf("[%s] Unable to pack txn for commitState", logPrefix), "err", err)
return lastStateSyncEventID, i, time.Since(fetchStart), err
return lastStateSyncEventID, i, skipCount + overrideCount, time.Since(fetchStart), err
}

eventIdBytes := eventRecord.MarshallIdBytes()
if err = tx.Put(kv.BorEvents, eventIdBytes, data); err != nil {
return lastStateSyncEventID, i, time.Since(fetchStart), err
return lastStateSyncEventID, i, skipCount + overrideCount, time.Since(fetchStart), err
}

if initialRecordTime == nil {
Expand All @@ -524,16 +532,18 @@ func fetchAndWriteHeimdallStateSyncEvents(
lastEventRecord = eventRecord
}

skipCount += overrideCount

if lastEventRecord != nil {
logger.Debug("putting state sync events", "blockNum", blockNum, "lastID", lastEventRecord.ID)

var blockNumBuf [8]byte
binary.BigEndian.PutUint64(blockNumBuf[:], blockNum)
eventIdBytes := lastEventRecord.MarshallIdBytes()
if err = tx.Put(kv.BorEventNums, blockNumBuf[:], eventIdBytes); err != nil {
return lastStateSyncEventID, len(eventRecords), time.Since(fetchStart), err
return lastStateSyncEventID, len(eventRecords), skipCount, time.Since(fetchStart), err
}
}

return lastStateSyncEventID, len(eventRecords), time.Since(fetchStart), nil
return lastStateSyncEventID, len(eventRecords), skipCount, time.Since(fetchStart), nil
}
13 changes: 12 additions & 1 deletion eth/stagedsync/stage_bor_heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,15 @@ func BorHeimdallForward(

var nextEventRecord *heimdall.EventRecordWithTime

// sometimes via config eveents are skipped from particular blocks and
// pushed into the next one, when this happens we need to skip validation
// as the times won't match the expected window. In practice it only affects
// these blocks: 14949120,14949184, 14953472, 14953536, 14953600, 14953664,
// 14953728, 14953792, 14953856 so it seems keeping a local skip marker is good
// enough - it will only impact sync from origin operations. If
// this becomes more prevalent this will need to be re-thought
var skipCount int

for blockNum = lastBlockNum + 1; blockNum <= headNumber; blockNum++ {
select {
default:
Expand Down Expand Up @@ -287,6 +296,7 @@ func BorHeimdallForward(
if cfg.blockReader.BorSnapshots().SegmentsMin() == 0 {
snapTime = snapTime + time.Since(snapStart)
// SegmentsMin is only set if running as an uploader process (check SnapshotsCfg.snapshotUploader and
// SegmentsMin is only set if running as an uploader process (check SnapshotsCfg.snapshotUploader and
// UploadLocationFlag) when we remove snapshots based on FrozenBlockLimit and number of uploaded snapshots
// avoid calling this if block for blockNums <= SegmentsMin to avoid reinsertion of snapshots
snap := loadSnapshot(blockNum, header.Hash(), cfg.borConfig, recents, signatures, cfg.snapDb, logger)
Expand Down Expand Up @@ -362,7 +372,7 @@ func BorHeimdallForward(
var records int

if lastStateSyncEventID == 0 || lastStateSyncEventID != endStateSyncEventId {
lastStateSyncEventID, records, callTime, err = fetchRequiredHeimdallStateSyncEventsIfNeeded(
lastStateSyncEventID, records, skipCount, callTime, err = fetchRequiredHeimdallStateSyncEventsIfNeeded(
ctx,
header,
tx,
Expand All @@ -373,6 +383,7 @@ func BorHeimdallForward(
s.LogPrefix(),
logger,
lastStateSyncEventID,
skipCount,
)

if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion eth/stagedsync/stage_mining_bor_heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func MiningBorHeimdallForward(
return err
}

lastStateSyncEventID, records, fetchTime, err := fetchRequiredHeimdallStateSyncEventsIfNeeded(
lastStateSyncEventID, records, _, fetchTime, err := fetchRequiredHeimdallStateSyncEventsIfNeeded(
ctx,
header,
tx,
Expand All @@ -92,6 +92,7 @@ func MiningBorHeimdallForward(
logPrefix,
logger,
lastStateSyncEventID,
0,
)

if err != nil {
Expand Down

0 comments on commit a341d06

Please sign in to comment.