diff --git a/eth/backend.go b/eth/backend.go index 765ee347ae6e..66094f9a978b 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -209,7 +209,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { } eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain) - eth.syncService, err = sync_service.NewSyncService(context.Background(), chainConfig, stack.Config(), eth.blockchain, eth.chainDb) + eth.syncService, err = sync_service.NewSyncService(context.Background(), chainConfig, stack.Config(), eth.chainDb) if err != nil { return nil, fmt.Errorf("cannot initialize sync service: %w", err) } diff --git a/rollup/sync_service/bridge_client.go b/rollup/sync_service/bridge_client.go new file mode 100644 index 000000000000..bd0dc5796f5a --- /dev/null +++ b/rollup/sync_service/bridge_client.go @@ -0,0 +1,146 @@ +package sync_service + +import ( + "context" + "errors" + "fmt" + "math/big" + + "github.com/scroll-tech/go-ethereum" + "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/core/types" + "github.com/scroll-tech/go-ethereum/ethclient" + "github.com/scroll-tech/go-ethereum/log" + "github.com/scroll-tech/go-ethereum/rpc" +) + +type BridgeClient struct { + client *ethclient.Client + confirmations rpc.BlockNumber + l1MessageQueueAddress common.Address +} + +func newBridgeClient(ctx context.Context, l1Endpoint string, l1ChainId uint64, confirmations rpc.BlockNumber, l1MessageQueueAddress *common.Address) (*BridgeClient, error) { + if l1MessageQueueAddress == nil { + return nil, errors.New("must pass l1MessageQueueAddress to BridgeClient") + } + + ethClient, err := ethclient.Dial(l1Endpoint) + if err != nil { + return nil, fmt.Errorf("failed to dial L1 endpoint: %w", err) + } + + // sanity check: compare chain IDs + got, err := ethClient.ChainID(ctx) + if err != nil { + return nil, err + } + if got.Uint64() != l1ChainId { // TODO + return nil, fmt.Errorf("unexpected chain ID, expected = %v, got = %v", l1ChainId, got) + } + + client := BridgeClient{ + client: ethClient, + confirmations: confirmations, + l1MessageQueueAddress: *l1MessageQueueAddress, + } + + return &client, nil +} + +func (c *BridgeClient) fetchMessagesInRange(ctx context.Context, from, to uint64) ([]types.L1MessageTx, error) { + query := ethereum.FilterQuery{ + FromBlock: big.NewInt(0).SetUint64(from), + ToBlock: big.NewInt(0).SetUint64(to), + Addresses: []common.Address{ + c.l1MessageQueueAddress, + }, + Topics: [][]common.Hash{ + {L1QueueTransactionEventSignature}, + }, + } + + logs, err := c.client.FilterLogs(ctx, query) + if err != nil { + log.Warn("eth_getLogs failed", "err", err) + return nil, err + } + + if len(logs) == 0 { + return nil, nil + } + + log.Info("Received new L1 events", "fromBlock", from, "toBlock", to, "count", len(logs)) + + msgs, err := c.parseLogs(logs) + if err != nil { + log.Error("Failed to parse emitted events logs", "err", err) + return nil, err + } + + return msgs, nil +} + +func (c *BridgeClient) parseLogs(logs []types.Log) ([]types.L1MessageTx, error) { + var msgs []types.L1MessageTx + + for _, vLog := range logs { + event := L1QueueTransactionEvent{} + err := UnpackLog(L1MessageQueueABI, &event, "QueueTransaction", vLog) + if err != nil { + log.Warn("Failed to unpack L1 QueueTransaction event", "err", err) + return msgs, err + } + + msgs = append(msgs, types.L1MessageTx{ + Nonce: event.QueueIndex.Uint64(), + Gas: event.GasLimit.Uint64(), + To: &event.Target, + Value: event.Value, + Data: event.Data, + Sender: &event.Sender, + }) + } + + return msgs, nil +} + +// getLatestConfirmedBlockNumber get confirmed block number by rpc.BlockNumber type. +func (c *BridgeClient) getLatestConfirmedBlockNumber(ctx context.Context) (uint64, error) { + if c.confirmations == rpc.SafeBlockNumber || c.confirmations == rpc.FinalizedBlockNumber { + var tag *big.Int + if c.confirmations == rpc.FinalizedBlockNumber { + tag = big.NewInt(int64(rpc.FinalizedBlockNumber)) + } else { + tag = big.NewInt(int64(rpc.SafeBlockNumber)) + } + + header, err := c.client.HeaderByNumber(ctx, tag) + if err != nil { + return 0, err + } + if !header.Number.IsInt64() { + return 0, fmt.Errorf("received invalid block confirm: %v", header.Number) + } + return header.Number.Uint64(), nil + } else if c.confirmations == rpc.LatestBlockNumber { + number, err := c.client.BlockNumber(ctx) + if err != nil { + return 0, err + } + return number, nil + } else if c.confirmations.Int64() >= 0 { // If it's positive integer, consider it as a certain confirm value. + number, err := c.client.BlockNumber(ctx) + if err != nil { + return 0, err + } + cfmNum := uint64(c.confirmations.Int64()) + + if number >= cfmNum { + return number - cfmNum, nil + } + return 0, nil + } else { + return 0, fmt.Errorf("unknown confirmation type: %v", c.confirmations) + } +} diff --git a/rollup/sync_service/sync_service.go b/rollup/sync_service/sync_service.go index ef5211d2b6f4..a4bf438bdcb4 100644 --- a/rollup/sync_service/sync_service.go +++ b/rollup/sync_service/sync_service.go @@ -2,56 +2,34 @@ package sync_service import ( "context" - "errors" "fmt" "math/big" "time" - "github.com/scroll-tech/go-ethereum" - "github.com/scroll-tech/go-ethereum/common" - "github.com/scroll-tech/go-ethereum/core" "github.com/scroll-tech/go-ethereum/core/rawdb" "github.com/scroll-tech/go-ethereum/core/types" - "github.com/scroll-tech/go-ethereum/ethclient" "github.com/scroll-tech/go-ethereum/ethdb" "github.com/scroll-tech/go-ethereum/log" "github.com/scroll-tech/go-ethereum/node" "github.com/scroll-tech/go-ethereum/params" - "github.com/scroll-tech/go-ethereum/rpc" ) const FetchLimit = uint64(20) const PollInterval = time.Second * 15 type SyncService struct { - bc *core.BlockChain - cancel context.CancelFunc - client *ethclient.Client - confirmations rpc.BlockNumber - ctx context.Context - db ethdb.Database - l1MessageQueueAddress *common.Address - latestProcessedBlock uint64 - pollInterval time.Duration + cancel context.CancelFunc + client *BridgeClient + ctx context.Context + db ethdb.Database + latestProcessedBlock uint64 + pollInterval time.Duration } -func NewSyncService(ctx context.Context, config *params.ChainConfig, nodeConfig *node.Config, bc *core.BlockChain, db ethdb.Database) (*SyncService, error) { - if bc == nil { - return nil, errors.New("must pass BlockChain to SyncService") - } - - client, err := ethclient.Dial(nodeConfig.L1Endpoint) +func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, nodeConfig *node.Config, db ethdb.Database) (*SyncService, error) { + client, err := newBridgeClient(ctx, nodeConfig.L1Endpoint, genesisConfig.L1Config.L1ChainId, nodeConfig.L1Confirmations, genesisConfig.L1Config.L1MessageQueueAddress) if err != nil { - return nil, err - } - - // sanity check: compare chain IDs - chainId, err := client.ChainID(ctx) - if err != nil { - return nil, err - } - if chainId.Uint64() != config.L1Config.L1ChainId { - return nil, fmt.Errorf("unexpected chain ID, expected = %v, got = %v", config.L1Config.L1ChainId, chainId) + return nil, fmt.Errorf("failed to initialize bridge client: %w", err) } // restart from latest synced block number @@ -63,15 +41,12 @@ func NewSyncService(ctx context.Context, config *params.ChainConfig, nodeConfig ctx, cancel := context.WithCancel(ctx) service := SyncService{ - bc: bc, - cancel: cancel, - client: client, - confirmations: nodeConfig.L1Confirmations, - ctx: ctx, - db: db, - l1MessageQueueAddress: config.L1Config.L1MessageQueueAddress, - latestProcessedBlock: latestProcessedBlock.Uint64(), // TODO - pollInterval: PollInterval, + cancel: cancel, + client: client, + ctx: ctx, + db: db, + latestProcessedBlock: latestProcessedBlock.Uint64(), // TODO + pollInterval: PollInterval, } return &service, nil @@ -100,7 +75,7 @@ func (s *SyncService) Stop() { } func (s *SyncService) fetchMessages() { - latestConfirmed, err := GetLatestConfirmedBlockNumber(s.ctx, s.client, s.confirmations) + latestConfirmed, err := s.client.getLatestConfirmedBlockNumber(s.ctx) if err != nil { log.Warn("failed to get latest confirmed block number", "err", err) return @@ -120,7 +95,7 @@ func (s *SyncService) fetchMessages() { to = latestConfirmed } - msgs, err := s.fetchMessagesInRange(from, to) + msgs, err := s.client.fetchMessagesInRange(s.ctx, from, to) if err != nil { log.Warn("failed to fetch messages in range", "err", err) return @@ -133,63 +108,6 @@ func (s *SyncService) fetchMessages() { } } -func (s *SyncService) fetchMessagesInRange(from, to uint64) ([]types.L1MessageTx, error) { - query := ethereum.FilterQuery{ - FromBlock: big.NewInt(0).SetUint64(from), - ToBlock: big.NewInt(0).SetUint64(to), - Addresses: []common.Address{ - *s.l1MessageQueueAddress, - }, - Topics: [][]common.Hash{ - {L1QueueTransactionEventSignature}, - }, - } - - logs, err := s.client.FilterLogs(s.ctx, query) - if err != nil { - log.Warn("eth_getLogs failed", "err", err) - return nil, err - } - - if len(logs) == 0 { - return nil, nil - } - - log.Info("Received new L1 events", "fromBlock", from, "toBlock", to, "count", len(logs)) - - msgs, err := s.parseLogs(logs) - if err != nil { - log.Error("Failed to parse emitted events logs", "err", err) - return nil, err - } - - return msgs, nil -} - -func (s *SyncService) parseLogs(logs []types.Log) ([]types.L1MessageTx, error) { - var msgs []types.L1MessageTx - - for _, vLog := range logs { - event := L1QueueTransactionEvent{} - err := UnpackLog(L1MessageQueueABI, &event, "QueueTransaction", vLog) - if err != nil { - log.Warn("Failed to unpack L1 QueueTransaction event", "err", err) - return msgs, err - } - - msgs = append(msgs, types.L1MessageTx{ - Nonce: event.QueueIndex.Uint64(), - Gas: event.GasLimit.Uint64(), - To: &event.Target, - Value: event.Value, - Data: event.Data, - Sender: &event.Sender, - }) - } - - return msgs, nil -} - func (s *SyncService) SetLatestSyncedL1BlockNumber(number uint64) { rawdb.WriteSyncedL1BlockNumber(s.db, big.NewInt(0).SetUint64(number)) } diff --git a/rollup/sync_service/util.go b/rollup/sync_service/util.go index c5acbbcd05de..1d3857eccef9 100644 --- a/rollup/sync_service/util.go +++ b/rollup/sync_service/util.go @@ -1,14 +1,10 @@ package sync_service import ( - "context" "fmt" - "math/big" "github.com/scroll-tech/go-ethereum/accounts/abi" "github.com/scroll-tech/go-ethereum/core/types" - "github.com/scroll-tech/go-ethereum/ethclient" - "github.com/scroll-tech/go-ethereum/rpc" ) // UnpackLog unpacks a retrieved log into the provided output structure. @@ -30,43 +26,3 @@ func UnpackLog(c *abi.ABI, out interface{}, event string, log types.Log) error { } return abi.ParseTopics(out, indexed, log.Topics[1:]) } - -// GetLatestConfirmedBlockNumber get confirmed block number by rpc.BlockNumber type. -func GetLatestConfirmedBlockNumber(ctx context.Context, client *ethclient.Client, confirm rpc.BlockNumber) (uint64, error) { - if confirm == rpc.SafeBlockNumber || confirm == rpc.FinalizedBlockNumber { - var tag *big.Int - if confirm == rpc.FinalizedBlockNumber { - tag = big.NewInt(int64(rpc.FinalizedBlockNumber)) - } else { - tag = big.NewInt(int64(rpc.SafeBlockNumber)) - } - - header, err := client.HeaderByNumber(ctx, tag) - if err != nil { - return 0, err - } - if !header.Number.IsInt64() { - return 0, fmt.Errorf("received invalid block confirm: %v", header.Number) - } - return header.Number.Uint64(), nil - } else if confirm == rpc.LatestBlockNumber { - number, err := client.BlockNumber(ctx) - if err != nil { - return 0, err - } - return number, nil - } else if confirm.Int64() >= 0 { // If it's positive integer, consider it as a certain confirm value. - number, err := client.BlockNumber(ctx) - if err != nil { - return 0, err - } - cfmNum := uint64(confirm.Int64()) - - if number >= cfmNum { - return number - cfmNum, nil - } - return 0, nil - } else { - return 0, fmt.Errorf("unknown confirmation type: %v", confirm) - } -}