Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/devnet/services/polygon/heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ func (h *Heimdall) FetchStateSyncEvents(ctx context.Context, fromID uint64, to t
return nil, fmt.Errorf("TODO")
}

func (h *Heimdall) FetchStateSyncEvent(ctx context.Context, id uint64) (*heimdall.EventRecordWithTime, error) {
return nil, fmt.Errorf("TODO")
}

func (h *Heimdall) Close() {
h.unsubscribe()
}
Expand Down
2 changes: 1 addition & 1 deletion eth/stagedsync/bor_heimdall_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func fetchAndWriteHeimdallStateSyncEvents(

from = lastStateSyncEventID + 1

logger.Debug(
logger.Trace(
fmt.Sprintf("[%s] Fetching state updates from Heimdall", logPrefix),
"fromID", from,
"to", to.Format(time.RFC3339),
Expand Down
90 changes: 74 additions & 16 deletions eth/stagedsync/stage_bor_heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"sort"
"time"
Expand All @@ -14,10 +15,11 @@ import (
"golang.org/x/sync/errgroup"

"github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/chain/networkname"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/accounts/abi"
"github.com/ledgerwatch/erigon/common/math"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/dataflow"
Expand Down Expand Up @@ -90,6 +92,8 @@ func StageBorHeimdallCfg(
}
}

var lastMumbaiEventRecord *heimdall.EventRecordWithTime

func BorHeimdallForward(
s *StageState,
u Unwinder,
Expand Down Expand Up @@ -168,6 +172,8 @@ func BorHeimdallForward(
var fetchTime time.Duration
var snapTime time.Duration
var snapInitTime time.Duration
var syncEventTime time.Duration

var eventRecords int

lastSpanID, err := fetchRequiredHeimdallSpansIfNeeded(ctx, headNumber, tx, cfg, s.LogPrefix(), logger)
Expand All @@ -185,6 +191,9 @@ func BorHeimdallForward(
defer logTimer.Stop()

logger.Info(fmt.Sprintf("[%s] Processing sync events...", s.LogPrefix()), "from", lastBlockNum+1, "to", headNumber)

var nextEventRecord *heimdall.EventRecordWithTime

for blockNum = lastBlockNum + 1; blockNum <= headNumber; blockNum++ {
select {
default:
Expand All @@ -195,7 +204,8 @@ func BorHeimdallForward(
"lastSpanID", lastSpanID,
"lastStateSyncEventID", lastStateSyncEventID,
"total records", eventRecords,
"fetch time", fetchTime,
"sync-events", syncEventTime,
"sync-event-fetch", fetchTime,
"snaps", snapTime,
"snap-init", snapInitTime,
"process time", time.Since(processStart),
Expand Down Expand Up @@ -291,23 +301,70 @@ func BorHeimdallForward(
return err
}

syncEventStart := time.Now()

var callTime time.Duration
var records int
lastStateSyncEventID, records, callTime, err = fetchRequiredHeimdallStateSyncEventsIfNeeded(
ctx,
header,
tx,
cfg,
s.LogPrefix(),
logger,
lastStateSyncEventID,
)
if err != nil {
return err

var endStateSyncEventId uint64

// mumbai event records have stopped being produced as of march 2024
// as part of the goerli decom - so there is no point trying to
// fetch them
if cfg.chainConfig.ChainName == networkname.MumbaiChainName {
if nextEventRecord == nil {
nextEventRecord = lastMumbaiEventRecord
}
}

if nextEventRecord == nil || header.Time > uint64(nextEventRecord.Time.Unix()) {
var records int

if lastStateSyncEventID == 0 || lastStateSyncEventID != endStateSyncEventId {
lastStateSyncEventID, records, callTime, err = fetchRequiredHeimdallStateSyncEventsIfNeeded(
ctx,
header,
tx,
cfg,
s.LogPrefix(),
logger,
lastStateSyncEventID,
)

if err != nil {
return err
}
}

if records != 0 {
nextEventRecord = nil
eventRecords += records
} else {
if nextEventRecord == nil || nextEventRecord.ID <= lastStateSyncEventID {
if eventRecord, err := cfg.heimdallClient.FetchStateSyncEvent(ctx, lastStateSyncEventID+1); err == nil {
nextEventRecord = eventRecord
endStateSyncEventId = 0
} else {
if !errors.Is(err, heimdall.ErrEventRecordNotFound) {
return err
}

if cfg.chainConfig.ChainName == networkname.MumbaiChainName && lastStateSyncEventID == 276850 {
lastMumbaiEventRecord = &heimdall.EventRecordWithTime{
EventRecord: heimdall.EventRecord{
ID: 276851,
},
Time: time.Unix(math.MaxInt64, 0),
}
}

endStateSyncEventId = lastStateSyncEventID
}
}
}
}

eventRecords += records
fetchTime += callTime
syncEventTime = syncEventTime + time.Since(syncEventStart)

if cfg.loopBreakCheck != nil && cfg.loopBreakCheck(int(blockNum-lastBlockNum)) {
headNumber = blockNum
Expand All @@ -331,6 +388,7 @@ func BorHeimdallForward(
"lastSpanID", lastSpanID,
"lastStateSyncEventID", lastStateSyncEventID,
"total records", eventRecords,
"sync event time", syncEventTime,
"fetch time", fetchTime,
"snap time", snapTime,
"process time", time.Since(processStart),
Expand Down Expand Up @@ -455,7 +513,7 @@ func persistValidatorSets(
var err error
if snap, err = snap.Apply(parent, headers, logger); err != nil {
if snap != nil {
var badHash common.Hash
var badHash libcommon.Hash
for _, header := range headers {
if header.Number.Uint64() == snap.Number+1 {
badHash = header.Hash()
Expand Down
7 changes: 7 additions & 0 deletions eth/stagedsync/stagedsynctest/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,13 @@ func (h *Harness) mockHeimdallClient() {
return []*heimdall.EventRecordWithTime{&newEvent}, nil
}).
AnyTimes()
h.heimdallClient.
EXPECT().
FetchStateSyncEvent(gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, _ uint64) (*heimdall.EventRecordWithTime, error) {
return nil, heimdall.ErrEventRecordNotFound
}).
AnyTimes()
}

func (h *Harness) runSyncStageForwardWithErrorIs(
Expand Down
4 changes: 4 additions & 0 deletions polygon/bor/bor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (h test_heimdall) FetchStateSyncEvents(ctx context.Context, fromID uint64,
return nil, nil
}

func (h *test_heimdall) FetchStateSyncEvent(ctx context.Context, id uint64) (*heimdall.EventRecordWithTime, error) {
return nil, nil
}

func (h *test_heimdall) FetchSpan(ctx context.Context, spanID uint64) (*heimdall.Span, error) {

if span, ok := h.spans[heimdall.SpanId(spanID)]; ok {
Expand Down
38 changes: 35 additions & 3 deletions polygon/heimdall/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
//go:generate mockgen -destination=./client_mock.go -package=heimdall . HeimdallClient
type HeimdallClient interface {
FetchStateSyncEvents(ctx context.Context, fromId uint64, to time.Time, limit int) ([]*EventRecordWithTime, error)
FetchStateSyncEvent(ctx context.Context, id uint64) (*EventRecordWithTime, error)

FetchLatestSpan(ctx context.Context) (*Span, error)
FetchSpan(ctx context.Context, spanID uint64) (*Span, error)
Expand Down Expand Up @@ -106,6 +107,7 @@ func newHeimdallClient(urlString string, httpClient HttpClient, retryBackOff tim
const (
fetchStateSyncEventsFormat = "from-id=%d&to-time=%d&limit=%d"
fetchStateSyncEventsPath = "clerk/event-record/list"
fetchStateSyncEvent = "clerk/event-record/%s"

fetchCheckpoint = "/checkpoints/%s"
fetchCheckpointCount = "/checkpoints/count"
Expand All @@ -130,12 +132,12 @@ func (c *Client) FetchStateSyncEvents(ctx context.Context, fromID uint64, to tim
eventRecords := make([]*EventRecordWithTime, 0)

for {
url, err := stateSyncURL(c.urlString, fromID, to.Unix())
url, err := stateSyncListURL(c.urlString, fromID, to.Unix())
if err != nil {
return nil, err
}

c.logger.Debug(heimdallLogPrefix("Fetching state sync events"), "queryParams", url.RawQuery)
c.logger.Trace(heimdallLogPrefix("Fetching state sync events"), "queryParams", url.RawQuery)

ctx = withRequestType(ctx, stateSyncRequest)

Expand Down Expand Up @@ -173,6 +175,32 @@ func (c *Client) FetchStateSyncEvents(ctx context.Context, fromID uint64, to tim
return eventRecords, nil
}

func (c *Client) FetchStateSyncEvent(ctx context.Context, id uint64) (*EventRecordWithTime, error) {
url, err := stateSyncURL(c.urlString, id)

if err != nil {
return nil, err
}

ctx = withRequestType(ctx, stateSyncRequest)

isRecoverableError := func(err error) bool {
return !strings.Contains(err.Error(), "could not get state record; No record found")
}

response, err := FetchWithRetryEx[StateSyncEventResponse](ctx, c, url, isRecoverableError, c.logger)

if err != nil {
if strings.Contains(err.Error(), "could not get state record; No record found") {
return nil, ErrEventRecordNotFound
}

return nil, err
}

return &response.Result, nil
}

func (c *Client) FetchLatestSpan(ctx context.Context) (*Span, error) {
url, err := latestSpanURL(c.urlString)
if err != nil {
Expand Down Expand Up @@ -457,12 +485,16 @@ func latestSpanURL(urlString string) (*url.URL, error) {
return makeURL(urlString, fetchSpanLatest, "")
}

func stateSyncURL(urlString string, fromID uint64, to int64) (*url.URL, error) {
func stateSyncListURL(urlString string, fromID uint64, to int64) (*url.URL, error) {
queryParams := fmt.Sprintf(fetchStateSyncEventsFormat, fromID, to, stateFetchLimit)

return makeURL(urlString, fetchStateSyncEventsPath, queryParams)
}

func stateSyncURL(urlString string, id uint64) (*url.URL, error) {
return makeURL(urlString, fmt.Sprintf(fetchStateSyncEvent, fmt.Sprint(id)), "")
}

func checkpointURL(urlString string, number int64) (*url.URL, error) {
url := ""
if number == -1 {
Expand Down
15 changes: 15 additions & 0 deletions polygon/heimdall/client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions polygon/heimdall/event_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type EventRecordWithTime struct {
Time time.Time `json:"record_time" yaml:"record_time"`
}

var ErrEventRecordNotFound = fmt.Errorf("event record not found")

// String returns the string representatin of a state record
func (e *EventRecordWithTime) String() string {
return fmt.Sprintf(
Expand Down Expand Up @@ -78,3 +80,8 @@ type StateSyncEventsResponse struct {
Height string `json:"height"`
Result []*EventRecordWithTime `json:"result"`
}

type StateSyncEventResponse struct {
Height string `json:"height"`
Result EventRecordWithTime `json:"result"`
}