diff --git a/polygon/bridge/bridge.go b/polygon/bridge/bridge.go index a412e9528b5..58ddf7dddcc 100644 --- a/polygon/bridge/bridge.go +++ b/polygon/bridge/bridge.go @@ -57,7 +57,6 @@ func (b *Bridge) Run(ctx context.Context) error { if err != nil { return err } - defer b.Close() // get last known sync ID lastEventID, err := b.store.GetLatestEventID(ctx) @@ -111,7 +110,7 @@ func (b *Bridge) ProcessNewBlocks(ctx context.Context, blocks []*types.Block) er eventMap := make(map[uint64]uint64) for _, block := range blocks { // check if block is start of span - if !b.isSprintStart(block.NumberU64()) { + if b.isSprintStart(block.NumberU64()) { continue } @@ -165,16 +164,12 @@ func (b *Bridge) GetEvents(ctx context.Context, blockNum uint64) ([]*types.Messa return nil, err } - if end == 0 { // exception for tip processing - end = b.lastProcessedEventID - } - b.log.Debug("got map", "blockNum", blockNum, "start", start, "end", end) - eventsRaw := make([]*types.Message, 0, end-start+1) + eventsRaw := make([]*types.Message, end-start+1) // get events from DB - events, err := b.store.GetEvents(ctx, start+1, end+1) + events, err := b.store.GetEvents(ctx, start, end) if err != nil { return nil, err } @@ -197,7 +192,6 @@ func (b *Bridge) GetEvents(ctx context.Context, blockNum uint64) ([]*types.Messa eventsRaw = append(eventsRaw, &msg) } - return eventsRaw, nil } diff --git a/polygon/bridge/bridge_test.go b/polygon/bridge/bridge_test.go deleted file mode 100644 index 3acc71c465f..00000000000 --- a/polygon/bridge/bridge_test.go +++ /dev/null @@ -1,251 +0,0 @@ -package bridge_test - -import ( - "context" - "errors" - "math/big" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/require" - "go.uber.org/mock/gomock" - - "github.com/ledgerwatch/erigon-lib/common/hexutil" - "github.com/ledgerwatch/erigon-lib/log/v3" - "github.com/ledgerwatch/erigon/accounts/abi" - "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/polygon/bor" - "github.com/ledgerwatch/erigon/polygon/bor/borcfg" - "github.com/ledgerwatch/erigon/polygon/bridge" - "github.com/ledgerwatch/erigon/polygon/heimdall" - "github.com/ledgerwatch/erigon/polygon/polygoncommon" - "github.com/ledgerwatch/erigon/rlp" - "github.com/ledgerwatch/erigon/turbo/testlog" -) - -func setup(t *testing.T, abi abi.ABI) (*heimdall.MockHeimdallClient, *bridge.Bridge) { - ctrl := gomock.NewController(t) - logger := testlog.Logger(t, log.LvlDebug) - borConfig := borcfg.BorConfig{ - Sprint: map[string]uint64{"0": 2}, - StateReceiverContract: "0x0000000000000000000000000000000000001001", - } - - heimdallClient := heimdall.NewMockHeimdallClient(ctrl) - polygonBridgeDB := polygoncommon.NewDatabase(t.TempDir(), logger) - store := bridge.NewStore(polygonBridgeDB) - b := bridge.NewBridge(store, logger, &borConfig, heimdallClient.FetchStateSyncEvents, abi) - - return heimdallClient, b -} - -func getBlocks(t *testing.T, numBlocks int) []*types.Block { - // Feed in new blocks - rawBlocks := make([]*types.RawBlock, 0, numBlocks) - - for i := 0; i < numBlocks; i++ { - rawBlocks = append(rawBlocks, &types.RawBlock{ - Header: &types.Header{ - Number: big.NewInt(int64(i)), - Time: uint64(50 * (i + 1)), - }, - Body: &types.RawBody{}, - }) - } - - blocks := make([]*types.Block, len(rawBlocks)) - - for i, rawBlock := range rawBlocks { - b, err := rawBlock.AsBlock() - require.NoError(t, err) - - blocks[i] = b - } - - return blocks -} - -func TestBridge(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - stateReceiverABI := bor.GenesisContractStateReceiverABI() - heimdallClient, b := setup(t, stateReceiverABI) - - event1 := &heimdall.EventRecordWithTime{ - EventRecord: heimdall.EventRecord{ - ID: 1, - ChainID: "80001", - Data: hexutil.MustDecode("0x01"), - }, - Time: time.Unix(50, 0), // block 2 - } - event2 := &heimdall.EventRecordWithTime{ - EventRecord: heimdall.EventRecord{ - ID: 2, - ChainID: "80001", - Data: hexutil.MustDecode("0x02"), - }, - Time: time.Unix(100, 0), // block 2 - } - event3 := &heimdall.EventRecordWithTime{ - EventRecord: heimdall.EventRecord{ - ID: 3, - ChainID: "80001", - Data: hexutil.MustDecode("0x03"), - }, - Time: time.Unix(200, 0), // block 4 - } - - events := []*heimdall.EventRecordWithTime{event1, event2, event3} - - heimdallClient.EXPECT().FetchStateSyncEvents(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(events, nil).Times(1) - heimdallClient.EXPECT().FetchStateSyncEvents(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return([]*heimdall.EventRecordWithTime{}, nil).AnyTimes() - - var wg sync.WaitGroup - wg.Add(1) - - go func(bridge bridge.Service) { - defer wg.Done() - - err := bridge.Run(ctx) - if err != nil { - if !errors.Is(err, ctx.Err()) { - t.Error(err) - } - - return - } - }(b) - - err := b.Synchronize(ctx, &types.Header{Number: big.NewInt(100)}) // hack to wait for b.ready - require.NoError(t, err) - - blocks := getBlocks(t, 5) - - err = b.ProcessNewBlocks(ctx, blocks) - require.NoError(t, err) - - res, err := b.GetEvents(ctx, 2) - require.NoError(t, err) - - event1Data, err := event1.Pack(stateReceiverABI) - require.NoError(t, err) - - event2Data, err := event2.Pack(stateReceiverABI) - require.NoError(t, err) - - require.Equal(t, 2, len(res)) // have first two events - require.Equal(t, event1Data, rlp.RawValue(res[0].Data())) // check data fields - require.Equal(t, event2Data, rlp.RawValue(res[1].Data())) - - res, err = b.GetEvents(ctx, 4) - require.NoError(t, err) - - event3Data, err := event3.Pack(stateReceiverABI) - require.NoError(t, err) - - require.Equal(t, 1, len(res)) - require.Equal(t, event3Data, rlp.RawValue(res[0].Data())) - - // get non-sprint block - _, err = b.GetEvents(ctx, 1) - require.Error(t, err) - - _, err = b.GetEvents(ctx, 3) - require.Error(t, err) - - // check block 0 - _, err = b.GetEvents(ctx, 0) - require.Error(t, err) - - cancel() - wg.Wait() -} - -func TestBridge_Unwind(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - stateReceiverABI := bor.GenesisContractStateReceiverABI() - heimdallClient, b := setup(t, stateReceiverABI) - - event1 := &heimdall.EventRecordWithTime{ - EventRecord: heimdall.EventRecord{ - ID: 1, - ChainID: "80001", - Data: hexutil.MustDecode("0x01"), - }, - Time: time.Unix(50, 0), // block 2 - } - event2 := &heimdall.EventRecordWithTime{ - EventRecord: heimdall.EventRecord{ - ID: 2, - ChainID: "80001", - Data: hexutil.MustDecode("0x02"), - }, - Time: time.Unix(100, 0), // block 2 - } - event3 := &heimdall.EventRecordWithTime{ - EventRecord: heimdall.EventRecord{ - ID: 3, - ChainID: "80001", - Data: hexutil.MustDecode("0x03"), - }, - Time: time.Unix(200, 0), // block 4 - } - event4 := &heimdall.EventRecordWithTime{ - EventRecord: heimdall.EventRecord{ - ID: 4, - ChainID: "80001", - Data: hexutil.MustDecode("0x03"), - }, - Time: time.Unix(300, 0), // block 6 - } - - events := []*heimdall.EventRecordWithTime{event1, event2, event3, event4} - - heimdallClient.EXPECT().FetchStateSyncEvents(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(events, nil).Times(1) - heimdallClient.EXPECT().FetchStateSyncEvents(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return([]*heimdall.EventRecordWithTime{}, nil).AnyTimes() - - var wg sync.WaitGroup - wg.Add(1) - - go func(bridge bridge.Service) { - defer wg.Done() - - err := bridge.Run(ctx) - if err != nil { - if !errors.Is(err, ctx.Err()) { - t.Error(err) - } - - return - } - }(b) - - err := b.Synchronize(ctx, &types.Header{Number: big.NewInt(100)}) // hack to wait for b.ready - require.NoError(t, err) - - blocks := getBlocks(t, 8) - - err = b.ProcessNewBlocks(ctx, blocks) - require.NoError(t, err) - - event3Data, err := event3.Pack(stateReceiverABI) - require.NoError(t, err) - - res, err := b.GetEvents(ctx, 4) - require.Equal(t, event3Data, rlp.RawValue(res[0].Data())) - require.NoError(t, err) - - err = b.Unwind(ctx, &types.Header{Number: big.NewInt(3)}) - require.NoError(t, err) - - _, err = b.GetEvents(ctx, 4) - require.Error(t, err) - - cancel() - wg.Wait() -} diff --git a/polygon/bridge/db.go b/polygon/bridge/db.go index 64dc1374eff..8ba40b157fb 100644 --- a/polygon/bridge/db.go +++ b/polygon/bridge/db.go @@ -4,8 +4,6 @@ import ( "bytes" "context" "encoding/binary" - "errors" - "fmt" "time" "github.com/ledgerwatch/erigon-lib/kv" @@ -14,19 +12,6 @@ import ( "github.com/ledgerwatch/erigon/polygon/polygoncommon" ) -/* - BorEventNums stores the last event ID of the last sprint. - - e.g. For block 10 with events [1,2,3], block 15 with events [4,5,6] and block 20 with events [7,8]. - The DB will have the following. - 10: 0 (initialized at zero, NOTE: Polygon does not have and event 0) - 15: 3 - 20: 6 - - To get the events for block 15, we look up the map for 15 and 20 and get back 3 and 6. So our - ID range is [4,6]. -*/ - var databaseTablesCfg = kv.TableCfg{ kv.BorEvents: {}, kv.BorEventNums: {}, @@ -125,6 +110,10 @@ func (s *MdbxStore) GetSprintLastEventID(ctx context.Context, lastID uint64, tim kLastID := make([]byte, 8) binary.BigEndian.PutUint64(kLastID, lastID) + if bytes.Equal(kLastID, kDBLast) { + return lastID, nil + } + _, _, err = cursor.Seek(kLastID) if err != nil { return eventID, err @@ -236,9 +225,6 @@ func (s *MdbxStore) StoreEventID(ctx context.Context, eventMap map[uint64]uint64 return tx.Commit() } -// GetEventIDRange returns the state sync event ID range for the given block number. -// An error is thrown if the block number is not found in the database. If the given block -// number is the last in the database, then the second uint64 (representing end ID) is 0. func (s *MdbxStore) GetEventIDRange(ctx context.Context, blockNum uint64) (uint64, uint64, error) { var start, end uint64 @@ -251,31 +237,22 @@ func (s *MdbxStore) GetEventIDRange(ctx context.Context, blockNum uint64) (uint6 kByte := make([]byte, 8) binary.BigEndian.PutUint64(kByte, blockNum) - cursor, err := tx.Cursor(kv.BorEventNums) - if err != nil { - return start, end, err - } - - _, v, err := cursor.SeekExact(kByte) - if err != nil { - return start, end, err - } - if v == nil { // we don't have a map - return start, end, errors.New(fmt.Sprintf("map not available for block %d", blockNum)) - } - - err = binary.Read(bytes.NewReader(v), binary.BigEndian, &start) + it, err := tx.RangeAscend(kv.BorEventNums, kByte, nil, 2) if err != nil { return start, end, err } - _, v, err = cursor.Next() - if err != nil { - return start, end, err - } + for it.HasNext() { + _, v, err := it.Next() + if err != nil { + return start, end, err + } - if v != nil { // may be empty if blockNum is the last entry - err = binary.Read(bytes.NewReader(v), binary.BigEndian, &end) + if start == 0 { + err = binary.Read(bytes.NewReader(v), binary.BigEndian, &start) + } else { + err = binary.Read(bytes.NewReader(v), binary.BigEndian, &end) + } if err != nil { return start, end, err } @@ -294,21 +271,22 @@ func (s *MdbxStore) PruneEventIDs(ctx context.Context, blockNum uint64) error { kByte := make([]byte, 8) binary.BigEndian.PutUint64(kByte, blockNum) - cursor, err := tx.Cursor(kv.BorEventNums) + it, err := tx.RangeDescend(kv.BorEvents, nil, kByte, 0) if err != nil { return err } - defer cursor.Close() - var k []byte - for k, _, err = cursor.Seek(kByte); err == nil && k != nil; k, _, err = cursor.Next() { - if err := tx.Delete(kv.BorEventNums, k); err != nil { + for it.HasNext() { + k, _, err := it.Next() + if err != nil { + return err + } + + err = tx.Delete(kv.BorEventNums, k) + if err != nil { return err } - } - if err != nil { - return err } - return tx.Commit() + return nil }