Skip to content

Commit

Permalink
add L1 message sync service
Browse files Browse the repository at this point in the history
  • Loading branch information
Thegaram authored and NazariiDenha committed Apr 17, 2023
1 parent 2c70f51 commit 5305e8a
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 0 deletions.
9 changes: 9 additions & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package eth

import (
"context"
"errors"
"fmt"
"math/big"
Expand Down Expand Up @@ -54,6 +55,7 @@ import (
"github.com/scroll-tech/go-ethereum/p2p/enode"
"github.com/scroll-tech/go-ethereum/params"
"github.com/scroll-tech/go-ethereum/rlp"
"github.com/scroll-tech/go-ethereum/rollup/sync_service"
"github.com/scroll-tech/go-ethereum/rpc"
)

Expand All @@ -67,6 +69,7 @@ type Ethereum struct {

// Handlers
txPool *core.TxPool
syncService *sync_service.SyncService
blockchain *core.BlockChain
handler *handler
ethDialCandidates enode.Iterator
Expand Down Expand Up @@ -206,6 +209,11 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
}
eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain)

eth.syncService, err = sync_service.NewSyncService(context.Background(), eth.blockchain, eth.chainDb)
if err != nil {
return nil, fmt.Errorf("cannot initialize sync service: %w", err)
}

// Permit the downloader to use the trie cache allowance during fast sync
cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit
checkpoint := config.Checkpoint
Expand Down Expand Up @@ -555,6 +563,7 @@ func (s *Ethereum) Stop() error {
s.bloomIndexer.Close()
close(s.closeBloomHandler)
s.txPool.Stop()
s.syncService.Stop()
s.miner.Close()
s.blockchain.Stop()
s.engine.Close()
Expand Down
39 changes: 39 additions & 0 deletions rollup/sync_service/bridge_abi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package sync_service

import (
"math/big"

"github.com/scroll-tech/go-ethereum/accounts/abi"
"github.com/scroll-tech/go-ethereum/accounts/abi/bind"
"github.com/scroll-tech/go-ethereum/common"
)

var (
// L1MessageQueueABI holds information about L1MessageQueue contract's context and available invokable methods.
L1MessageQueueABI *abi.ABI

// L1QueueTransactionEventSignature = keccak256("QueueTransaction(address,address,uint256,uint256,uint256,bytes)")
L1QueueTransactionEventSignature common.Hash
)

func init() {
L1MessageQueueABI, _ = L1MessageQueueMetaData.GetAbi()
L1QueueTransactionEventSignature = L1MessageQueueABI.Events["QueueTransaction"].ID
}

// Generated manually from abigen and only necessary events and mutable calls are kept.

// L1MessageQueueMetaData contains all meta data concerning the L1MessageQueue contract.
var L1MessageQueueMetaData = &bind.MetaData{
ABI: "[{\"anonymous\":false,\"inputs\":[{\"indexed\":true,\"internalType\":\"address\",\"name\":\"sender\",\"type\":\"address\"},{\"indexed\":true,\"internalType\":\"address\",\"name\":\"target\",\"type\":\"address\"},{\"indexed\":false,\"internalType\":\"uint256\",\"name\":\"value\",\"type\":\"uint256\"},{\"indexed\":false,\"internalType\":\"uint256\",\"name\":\"queueIndex\",\"type\":\"uint256\"},{\"indexed\":false,\"internalType\":\"uint256\",\"name\":\"gasLimit\",\"type\":\"uint256\"},{\"indexed\":false,\"internalType\":\"bytes\",\"name\":\"data\",\"type\":\"bytes\"}],\"name\":\"QueueTransaction\",\"type\":\"event\"},{\"inputs\":[{\"internalType\":\"address\",\"name\":\"target\",\"type\":\"address\"},{\"internalType\":\"uint256\",\"name\":\"gasLimit\",\"type\":\"uint256\"},{\"internalType\":\"bytes\",\"name\":\"data\",\"type\":\"bytes\"}],\"name\":\"appendCrossDomainMessage\",\"outputs\":[],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"address\",\"name\":\"sender\",\"type\":\"address\"},{\"internalType\":\"address\",\"name\":\"target\",\"type\":\"address\"},{\"internalType\":\"uint256\",\"name\":\"value\",\"type\":\"uint256\"},{\"internalType\":\"uint256\",\"name\":\"gasLimit\",\"type\":\"uint256\"},{\"internalType\":\"bytes\",\"name\":\"data\",\"type\":\"bytes\"}],\"name\":\"appendEnforcedTransaction\",\"outputs\":[],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"address\",\"name\":\"sender\",\"type\":\"address\"},{\"internalType\":\"address\",\"name\":\"target\",\"type\":\"address\"},{\"internalType\":\"bytes\",\"name\":\"message\",\"type\":\"bytes\"},{\"internalType\":\"uint256\",\"name\":\"gasLimit\",\"type\":\"uint256\"}],\"name\":\"estimateCrossDomainMessageFee\",\"outputs\":[{\"internalType\":\"uint256\",\"name\":\"\",\"type\":\"uint256\"}],\"stateMutability\":\"view\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"uint256\",\"name\":\"queueIndex\",\"type\":\"uint256\"}],\"name\":\"getCrossDomainMessage\",\"outputs\":[{\"internalType\":\"bytes32\",\"name\":\"\",\"type\":\"bytes32\"}],\"stateMutability\":\"view\",\"type\":\"function\"},{\"inputs\":[],\"name\":\"nextCrossDomainMessageIndex\",\"outputs\":[{\"internalType\":\"uint256\",\"name\":\"\",\"type\":\"uint256\"}],\"stateMutability\":\"view\",\"type\":\"function\"}]",
}

// L1QueueTransactionEvent represents a QueueTransaction event raised by the L1MessageQueue contract.
type L1QueueTransactionEvent struct {
Sender common.Address
Target common.Address
Value *big.Int
QueueIndex *big.Int
GasLimit *big.Int
Data []byte
}
180 changes: 180 additions & 0 deletions rollup/sync_service/sync_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package sync_service

import (
"context"
"errors"
"math/big"
"time"

"github.com/scroll-tech/go-ethereum"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core"
"github.com/scroll-tech/go-ethereum/core/rawdb"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/log"
)

const FetchLimit = uint64(20)
const PollInterval = time.Second * 15

type SyncService struct {
bc *core.BlockChain
cancel context.CancelFunc
client *ethclient.Client
ctx context.Context
db ethdb.Database
latestProcessedBlock uint64
pollInterval time.Duration
}

func NewSyncService(ctx context.Context, bc *core.BlockChain, db ethdb.Database) (*SyncService, error) {
if bc == nil {
return nil, errors.New("must pass BlockChain to SyncService")
}

client, err := ethclient.Dial("") // cfg.L1Config.Endpoint
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(ctx)
latestProcessedBlock := rawdb.ReadSyncedL1BlockNumber(db).Uint64() // TODO

service := SyncService{
bc: bc,
cancel: cancel,
client: client,
ctx: ctx,
db: db,
latestProcessedBlock: latestProcessedBlock,
pollInterval: PollInterval,
}

return &service, nil
}

func (s *SyncService) Start() {
t := time.NewTicker(s.pollInterval)
defer t.Stop()

for {
select {
case <-s.ctx.Done():
return
case <-t.C:
s.fetchMessages()
}
}
}

func (s *SyncService) Stop() error {
log.Info("Stopping sync service")

if s.cancel != nil {
defer s.cancel()
}
return nil
}

func (s *SyncService) fetchMessages() error {
// TODO
latestConfirmed, err := s.client.BlockNumber(s.ctx)
if err != nil {
log.Warn("eth_blockNumber failed", "err", err)
return nil
}

// query in batches
for from := s.latestProcessedBlock + 1; from <= latestConfirmed; from += FetchLimit {
select {
case <-s.ctx.Done():
return nil
default:
}

to := from + FetchLimit - 1

if to > latestConfirmed {
to = latestConfirmed
}

msgs, err := s.fetchMessagesInRange(from, to)
if err != nil {
return err
}

s.StoreMessages(msgs)

s.latestProcessedBlock = to
s.SetLatestSyncedL1BlockNumber(to)
}

return nil
}

func (s *SyncService) fetchMessagesInRange(from, to uint64) ([]types.L1MessageTx, error) {
query := ethereum.FilterQuery{
FromBlock: big.NewInt(0).SetUint64(from),
ToBlock: big.NewInt(0).SetUint64(to),
// Addresses: ,
Topics: [][]common.Hash{
{L1QueueTransactionEventSignature},
},
}

logs, err := s.client.FilterLogs(s.ctx, query)
if err != nil {
log.Warn("eth_getLogs failed", "err", err)
return nil, err
}

if len(logs) == 0 {
return nil, nil
}

log.Info("Received new L1 events", "fromBlock", from, "toBlock", to, "count", len(logs))

msgs, err := s.parseLogs(logs)
if err != nil {
log.Error("Failed to parse emitted events logs", "err", err)
return nil, err
}

return msgs, nil
}

func (s *SyncService) parseLogs(logs []types.Log) ([]types.L1MessageTx, error) {
var msgs []types.L1MessageTx

for _, vLog := range logs {
event := L1QueueTransactionEvent{}
err := UnpackLog(L1MessageQueueABI, &event, "QueueTransaction", vLog)
if err != nil {
log.Warn("Failed to unpack L1 QueueTransaction event", "err", err)
return msgs, err
}

msgs = append(msgs, types.L1MessageTx{
Nonce: event.QueueIndex.Uint64(),
Gas: event.GasLimit.Uint64(),
To: &event.Target,
Value: event.Value,
Data: event.Data,
Sender: &event.Sender,
})
}

return msgs, nil
}

func (s *SyncService) SetLatestSyncedL1BlockNumber(number uint64) {
rawdb.WriteSyncedL1BlockNumber(s.db, big.NewInt(0).SetUint64(number))
}

func (s *SyncService) StoreMessages(msgs []types.L1MessageTx) {
if len(msgs) > 0 {
rawdb.WriteL1Messages(s.db, msgs)
}
}
28 changes: 28 additions & 0 deletions rollup/sync_service/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package sync_service

import (
"fmt"

"github.com/scroll-tech/go-ethereum/accounts/abi"
"github.com/scroll-tech/go-ethereum/core/types"
)

// UnpackLog unpacks a retrieved log into the provided output structure.
// @todo: add unit test.
func UnpackLog(c *abi.ABI, out interface{}, event string, log types.Log) error {
if log.Topics[0] != c.Events[event].ID {
return fmt.Errorf("event signature mismatch")
}
if len(log.Data) > 0 {
if err := c.UnpackIntoInterface(out, event, log.Data); err != nil {
return err
}
}
var indexed abi.Arguments
for _, arg := range c.Events[event].Inputs {
if arg.Indexed {
indexed = append(indexed, arg)
}
}
return abi.ParseTopics(out, indexed, log.Topics[1:])
}

0 comments on commit 5305e8a

Please sign in to comment.