Skip to content

Commit

Permalink
move bridge client to separate file
Browse files Browse the repository at this point in the history
  • Loading branch information
Thegaram authored and NazariiDenha committed Apr 17, 2023
1 parent d3dcf87 commit 0b220be
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 144 deletions.
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
}
eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain)

eth.syncService, err = sync_service.NewSyncService(context.Background(), chainConfig, stack.Config(), eth.blockchain, eth.chainDb)
eth.syncService, err = sync_service.NewSyncService(context.Background(), chainConfig, stack.Config(), eth.chainDb)
if err != nil {
return nil, fmt.Errorf("cannot initialize sync service: %w", err)
}
Expand Down
146 changes: 146 additions & 0 deletions rollup/sync_service/bridge_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package sync_service

import (
"context"
"errors"
"fmt"
"math/big"

"github.com/scroll-tech/go-ethereum"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/rpc"
)

type BridgeClient struct {
client *ethclient.Client
confirmations rpc.BlockNumber
l1MessageQueueAddress common.Address
}

func newBridgeClient(ctx context.Context, l1Endpoint string, l1ChainId uint64, confirmations rpc.BlockNumber, l1MessageQueueAddress *common.Address) (*BridgeClient, error) {
if l1MessageQueueAddress == nil {
return nil, errors.New("must pass l1MessageQueueAddress to BridgeClient")
}

ethClient, err := ethclient.Dial(l1Endpoint)
if err != nil {
return nil, fmt.Errorf("failed to dial L1 endpoint: %w", err)
}

// sanity check: compare chain IDs
got, err := ethClient.ChainID(ctx)
if err != nil {
return nil, err
}
if got.Uint64() != l1ChainId { // TODO
return nil, fmt.Errorf("unexpected chain ID, expected = %v, got = %v", l1ChainId, got)
}

client := BridgeClient{
client: ethClient,
confirmations: confirmations,
l1MessageQueueAddress: *l1MessageQueueAddress,
}

return &client, nil
}

func (c *BridgeClient) fetchMessagesInRange(ctx context.Context, from, to uint64) ([]types.L1MessageTx, error) {
query := ethereum.FilterQuery{
FromBlock: big.NewInt(0).SetUint64(from),
ToBlock: big.NewInt(0).SetUint64(to),
Addresses: []common.Address{
c.l1MessageQueueAddress,
},
Topics: [][]common.Hash{
{L1QueueTransactionEventSignature},
},
}

logs, err := c.client.FilterLogs(ctx, query)
if err != nil {
log.Warn("eth_getLogs failed", "err", err)
return nil, err
}

if len(logs) == 0 {
return nil, nil
}

log.Info("Received new L1 events", "fromBlock", from, "toBlock", to, "count", len(logs))

msgs, err := c.parseLogs(logs)
if err != nil {
log.Error("Failed to parse emitted events logs", "err", err)
return nil, err
}

return msgs, nil
}

func (c *BridgeClient) parseLogs(logs []types.Log) ([]types.L1MessageTx, error) {
var msgs []types.L1MessageTx

for _, vLog := range logs {
event := L1QueueTransactionEvent{}
err := UnpackLog(L1MessageQueueABI, &event, "QueueTransaction", vLog)
if err != nil {
log.Warn("Failed to unpack L1 QueueTransaction event", "err", err)
return msgs, err
}

msgs = append(msgs, types.L1MessageTx{
Nonce: event.QueueIndex.Uint64(),
Gas: event.GasLimit.Uint64(),
To: &event.Target,
Value: event.Value,
Data: event.Data,
Sender: &event.Sender,
})
}

return msgs, nil
}

// getLatestConfirmedBlockNumber get confirmed block number by rpc.BlockNumber type.
func (c *BridgeClient) getLatestConfirmedBlockNumber(ctx context.Context) (uint64, error) {
if c.confirmations == rpc.SafeBlockNumber || c.confirmations == rpc.FinalizedBlockNumber {
var tag *big.Int
if c.confirmations == rpc.FinalizedBlockNumber {
tag = big.NewInt(int64(rpc.FinalizedBlockNumber))
} else {
tag = big.NewInt(int64(rpc.SafeBlockNumber))
}

header, err := c.client.HeaderByNumber(ctx, tag)
if err != nil {
return 0, err
}
if !header.Number.IsInt64() {
return 0, fmt.Errorf("received invalid block confirm: %v", header.Number)
}
return header.Number.Uint64(), nil
} else if c.confirmations == rpc.LatestBlockNumber {
number, err := c.client.BlockNumber(ctx)
if err != nil {
return 0, err
}
return number, nil
} else if c.confirmations.Int64() >= 0 { // If it's positive integer, consider it as a certain confirm value.
number, err := c.client.BlockNumber(ctx)
if err != nil {
return 0, err
}
cfmNum := uint64(c.confirmations.Int64())

if number >= cfmNum {
return number - cfmNum, nil
}
return 0, nil
} else {
return 0, fmt.Errorf("unknown confirmation type: %v", c.confirmations)
}
}
116 changes: 17 additions & 99 deletions rollup/sync_service/sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,56 +2,34 @@ package sync_service

import (
"context"
"errors"
"fmt"
"math/big"
"time"

"github.com/scroll-tech/go-ethereum"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core"
"github.com/scroll-tech/go-ethereum/core/rawdb"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/node"
"github.com/scroll-tech/go-ethereum/params"
"github.com/scroll-tech/go-ethereum/rpc"
)

const FetchLimit = uint64(20)
const PollInterval = time.Second * 15

type SyncService struct {
bc *core.BlockChain
cancel context.CancelFunc
client *ethclient.Client
confirmations rpc.BlockNumber
ctx context.Context
db ethdb.Database
l1MessageQueueAddress *common.Address
latestProcessedBlock uint64
pollInterval time.Duration
cancel context.CancelFunc
client *BridgeClient
ctx context.Context
db ethdb.Database
latestProcessedBlock uint64
pollInterval time.Duration
}

func NewSyncService(ctx context.Context, config *params.ChainConfig, nodeConfig *node.Config, bc *core.BlockChain, db ethdb.Database) (*SyncService, error) {
if bc == nil {
return nil, errors.New("must pass BlockChain to SyncService")
}

client, err := ethclient.Dial(nodeConfig.L1Endpoint)
func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, nodeConfig *node.Config, db ethdb.Database) (*SyncService, error) {
client, err := newBridgeClient(ctx, nodeConfig.L1Endpoint, genesisConfig.L1Config.L1ChainId, nodeConfig.L1Confirmations, genesisConfig.L1Config.L1MessageQueueAddress)
if err != nil {
return nil, err
}

// sanity check: compare chain IDs
chainId, err := client.ChainID(ctx)
if err != nil {
return nil, err
}
if chainId.Uint64() != config.L1Config.L1ChainId {
return nil, fmt.Errorf("unexpected chain ID, expected = %v, got = %v", config.L1Config.L1ChainId, chainId)
return nil, fmt.Errorf("failed to initialize bridge client: %w", err)
}

// restart from latest synced block number
Expand All @@ -63,15 +41,12 @@ func NewSyncService(ctx context.Context, config *params.ChainConfig, nodeConfig
ctx, cancel := context.WithCancel(ctx)

service := SyncService{
bc: bc,
cancel: cancel,
client: client,
confirmations: nodeConfig.L1Confirmations,
ctx: ctx,
db: db,
l1MessageQueueAddress: config.L1Config.L1MessageQueueAddress,
latestProcessedBlock: latestProcessedBlock.Uint64(), // TODO
pollInterval: PollInterval,
cancel: cancel,
client: client,
ctx: ctx,
db: db,
latestProcessedBlock: latestProcessedBlock.Uint64(), // TODO
pollInterval: PollInterval,
}

return &service, nil
Expand Down Expand Up @@ -100,7 +75,7 @@ func (s *SyncService) Stop() {
}

func (s *SyncService) fetchMessages() {
latestConfirmed, err := GetLatestConfirmedBlockNumber(s.ctx, s.client, s.confirmations)
latestConfirmed, err := s.client.getLatestConfirmedBlockNumber(s.ctx)
if err != nil {
log.Warn("failed to get latest confirmed block number", "err", err)
return
Expand All @@ -120,7 +95,7 @@ func (s *SyncService) fetchMessages() {
to = latestConfirmed
}

msgs, err := s.fetchMessagesInRange(from, to)
msgs, err := s.client.fetchMessagesInRange(s.ctx, from, to)
if err != nil {
log.Warn("failed to fetch messages in range", "err", err)
return
Expand All @@ -133,63 +108,6 @@ func (s *SyncService) fetchMessages() {
}
}

func (s *SyncService) fetchMessagesInRange(from, to uint64) ([]types.L1MessageTx, error) {
query := ethereum.FilterQuery{
FromBlock: big.NewInt(0).SetUint64(from),
ToBlock: big.NewInt(0).SetUint64(to),
Addresses: []common.Address{
*s.l1MessageQueueAddress,
},
Topics: [][]common.Hash{
{L1QueueTransactionEventSignature},
},
}

logs, err := s.client.FilterLogs(s.ctx, query)
if err != nil {
log.Warn("eth_getLogs failed", "err", err)
return nil, err
}

if len(logs) == 0 {
return nil, nil
}

log.Info("Received new L1 events", "fromBlock", from, "toBlock", to, "count", len(logs))

msgs, err := s.parseLogs(logs)
if err != nil {
log.Error("Failed to parse emitted events logs", "err", err)
return nil, err
}

return msgs, nil
}

func (s *SyncService) parseLogs(logs []types.Log) ([]types.L1MessageTx, error) {
var msgs []types.L1MessageTx

for _, vLog := range logs {
event := L1QueueTransactionEvent{}
err := UnpackLog(L1MessageQueueABI, &event, "QueueTransaction", vLog)
if err != nil {
log.Warn("Failed to unpack L1 QueueTransaction event", "err", err)
return msgs, err
}

msgs = append(msgs, types.L1MessageTx{
Nonce: event.QueueIndex.Uint64(),
Gas: event.GasLimit.Uint64(),
To: &event.Target,
Value: event.Value,
Data: event.Data,
Sender: &event.Sender,
})
}

return msgs, nil
}

func (s *SyncService) SetLatestSyncedL1BlockNumber(number uint64) {
rawdb.WriteSyncedL1BlockNumber(s.db, big.NewInt(0).SetUint64(number))
}
Expand Down
44 changes: 0 additions & 44 deletions rollup/sync_service/util.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package sync_service

import (
"context"
"fmt"
"math/big"

"github.com/scroll-tech/go-ethereum/accounts/abi"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/rpc"
)

// UnpackLog unpacks a retrieved log into the provided output structure.
Expand All @@ -30,43 +26,3 @@ func UnpackLog(c *abi.ABI, out interface{}, event string, log types.Log) error {
}
return abi.ParseTopics(out, indexed, log.Topics[1:])
}

// GetLatestConfirmedBlockNumber get confirmed block number by rpc.BlockNumber type.
func GetLatestConfirmedBlockNumber(ctx context.Context, client *ethclient.Client, confirm rpc.BlockNumber) (uint64, error) {
if confirm == rpc.SafeBlockNumber || confirm == rpc.FinalizedBlockNumber {
var tag *big.Int
if confirm == rpc.FinalizedBlockNumber {
tag = big.NewInt(int64(rpc.FinalizedBlockNumber))
} else {
tag = big.NewInt(int64(rpc.SafeBlockNumber))
}

header, err := client.HeaderByNumber(ctx, tag)
if err != nil {
return 0, err
}
if !header.Number.IsInt64() {
return 0, fmt.Errorf("received invalid block confirm: %v", header.Number)
}
return header.Number.Uint64(), nil
} else if confirm == rpc.LatestBlockNumber {
number, err := client.BlockNumber(ctx)
if err != nil {
return 0, err
}
return number, nil
} else if confirm.Int64() >= 0 { // If it's positive integer, consider it as a certain confirm value.
number, err := client.BlockNumber(ctx)
if err != nil {
return 0, err
}
cfmNum := uint64(confirm.Int64())

if number >= cfmNum {
return number - cfmNum, nil
}
return 0, nil
} else {
return 0, fmt.Errorf("unknown confirmation type: %v", confirm)
}
}

0 comments on commit 0b220be

Please sign in to comment.