Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: blockhash mismatch #913

Draft
wants to merge 13 commits into
base: feat/sync-directly-from-da
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions common/backoff/exponential.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package backoff

import (
"math"
"math/rand"
"time"
)

type Exponential struct {
attempt int

maxJitter time.Duration

min time.Duration
max time.Duration
}

func NewExponential(minimum, maximum, maxJitter time.Duration) *Exponential {
return &Exponential{
min: minimum,
max: maximum,
maxJitter: maxJitter,
}
}

func (e *Exponential) NextDuration() time.Duration {
var jitter time.Duration
if e.maxJitter > 0 {
jitter = time.Duration(rand.Int63n(e.maxJitter.Nanoseconds()))
}

minFloat := float64(e.min)
duration := math.Pow(2, float64(e.attempt)) * minFloat

// limit at configured maximum
if duration > float64(e.max) {
duration = float64(e.max)
}

e.attempt++
return time.Duration(duration) + jitter
}

func (e *Exponential) Reset() {
e.attempt = 0
}

func (e *Exponential) Attempt() int {
return e.attempt
}
39 changes: 39 additions & 0 deletions common/backoff/exponential_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package backoff

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestExponentialBackoff(t *testing.T) {
t.Run("Multiple attempts", func(t *testing.T) {
e := NewExponential(100*time.Millisecond, 10*time.Second, 0)
expectedDurations := []time.Duration{
100 * time.Millisecond,
200 * time.Millisecond,
400 * time.Millisecond,
800 * time.Millisecond,
1600 * time.Millisecond,
3200 * time.Millisecond,
6400 * time.Millisecond,
10 * time.Second, // capped at max
}
for i, expected := range expectedDurations {
require.Equal(t, expected, e.NextDuration(), "attempt %d", i)
}
})

t.Run("Jitter added", func(t *testing.T) {
e := NewExponential(1*time.Second, 10*time.Second, 1*time.Second)
duration := e.NextDuration()
require.GreaterOrEqual(t, duration, 1*time.Second)
require.Less(t, duration, 2*time.Second)
})

t.Run("Edge case: min > max", func(t *testing.T) {
e := NewExponential(10*time.Second, 5*time.Second, 0)
require.Equal(t, 5*time.Second, e.NextDuration())
})
}
52 changes: 38 additions & 14 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1803,25 +1803,49 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
return it.index, err
}

// PreprocessBlock processes block on top of the chain to calculate receipts, bloom and state root
func (bc *BlockChain) PreprocessBlock(block *types.Block) (common.Hash, types.Bloom, common.Hash, uint64, error) {
// Retrieve the parent block and it's state to execute on top
parent := bc.CurrentBlock().Header()
if parent == nil {
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types.Header, txs types.Transactions) (WriteStatus, error) {
if !bc.chainmu.TryLock() {
return NonStatTy, errInsertionInterrupted
}
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
defer bc.chainmu.Unlock()

statedb, err := state.New(parentBlock.Root(), bc.stateCache, bc.snaps)
if err != nil {
return common.Hash{}, types.Bloom{}, common.Hash{}, 0, err
return NonStatTy, err
}
receipts, _, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)

header.ParentHash = parentBlock.Hash()

tempBlock := types.NewBlockWithHeader(header).WithBody(txs, nil)
receipts, logs, gasUsed, err := bc.processor.Process(tempBlock, statedb, bc.vmConfig)
if err != nil {
return common.Hash{}, types.Bloom{}, common.Hash{}, 0, err
return NonStatTy, err
}
receiptSha := types.DeriveSha(receipts, trie.NewStackTrie(nil))
bloom := types.CreateBloom(receipts)
stateRoot := statedb.GetRootHash()
return receiptSha, bloom, stateRoot, usedGas, nil

header.GasUsed = gasUsed
header.Root = statedb.GetRootHash()
// Since we're using Clique consensus, we don't have uncles
header.UncleHash = types.EmptyUncleHash

fullBlock := types.NewBlock(header, txs, nil, receipts, trie.NewStackTrie(nil))

blockHash := fullBlock.Hash()
// manually replace the block hash in the receipts
for i, receipt := range receipts {
// add block location fields
receipt.BlockHash = blockHash
receipt.BlockNumber = tempBlock.Number()
receipt.TransactionIndex = uint(i)

for _, l := range receipt.Logs {
l.BlockHash = blockHash
}
}
for _, l := range logs {
l.BlockHash = blockHash
}

return bc.writeBlockWithState(fullBlock, receipts, logs, statedb, false)
}

// insertSideChain is called when an import batch hits upon a pruned ancestor
Expand Down
22 changes: 14 additions & 8 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,20 @@ func New(stack *node.Node, config *ethconfig.Config, l1Client sync_service.EthCl
}
eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain)

// Initialize and start DA syncing pipeline before SyncService as SyncService is blocking until all L1 messages are loaded.
// We need SyncService to load the L1 messages for DA syncing, but since both sync from last known L1 state, we can
// simply let them run simultaneously. If messages are missing in DA syncing, it will be handled by the syncing pipeline
// by waiting and retrying.
if config.EnableDASyncing {
// TODO: set proper default for data dir and enable setting via flag
config.DA.AdditionalDataDir = stack.Config().DataDir
eth.syncingPipeline, err = da_syncer.NewSyncingPipeline(context.Background(), eth.blockchain, chainConfig, eth.chainDb, l1Client, stack.Config().L1DeploymentBlock, config.DA)
if err != nil {
return nil, fmt.Errorf("cannot initialize da syncer: %w", err)
}
eth.syncingPipeline.Start()
}

// initialize and start L1 message sync service
eth.syncService, err = sync_service.NewSyncService(context.Background(), chainConfig, stack.Config(), eth.chainDb, l1Client)
if err != nil {
Expand All @@ -237,14 +251,6 @@ func New(stack *node.Node, config *ethconfig.Config, l1Client sync_service.EthCl
eth.rollupSyncService.Start()
}

if config.EnableDASyncing {
eth.syncingPipeline, err = da_syncer.NewSyncingPipeline(context.Background(), eth.blockchain, chainConfig, eth.chainDb, l1Client, stack.Config().L1DeploymentBlock, config.DA)
if err != nil {
return nil, fmt.Errorf("cannot initialize da syncer: %w", err)
}
eth.syncingPipeline.Start()
}

// Permit the downloader to use the trie cache allowance during fast sync
cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit
checkpoint := config.Checkpoint
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ require (
github.com/prometheus/tsdb v0.7.1
github.com/rjeczalik/notify v0.9.1
github.com/rs/cors v1.7.0
github.com/scroll-tech/da-codec v0.1.1-0.20240708084945-cb02d638c45f
github.com/scroll-tech/da-codec v0.1.1-0.20240725030910-15d6aa443140
github.com/scroll-tech/zktrie v0.8.4
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,8 @@ github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncj
github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik=
github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/scroll-tech/da-codec v0.1.1-0.20240708084945-cb02d638c45f h1:ZKPhn674+2AgBdIn2ZLGePsUZdM2823m2tJp+JlQf/Y=
github.com/scroll-tech/da-codec v0.1.1-0.20240708084945-cb02d638c45f/go.mod h1:O9jsbQGNnTEfyfZg7idevq6jGGSQshX70elX+TRH8vU=
github.com/scroll-tech/da-codec v0.1.1-0.20240725030910-15d6aa443140 h1:zoFmDxrK984L9GUu/jsnk5mqKG3S4QItzFUmtUYhMdk=
github.com/scroll-tech/da-codec v0.1.1-0.20240725030910-15d6aa443140/go.mod h1:O9jsbQGNnTEfyfZg7idevq6jGGSQshX70elX+TRH8vU=
github.com/scroll-tech/zktrie v0.8.4 h1:UagmnZ4Z3ITCk+aUq9NQZJNAwnWl4gSxsLb2Nl7IgRE=
github.com/scroll-tech/zktrie v0.8.4/go.mod h1:XvNo7vAk8yxNyTjBDj5WIiFzYW4bx/gJ78+NK6Zn6Uk=
github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
Expand Down
15 changes: 12 additions & 3 deletions params/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"golang.org/x/crypto/sha3"

"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/rollup/missing_header_fields"
"github.com/scroll-tech/go-ethereum/rollup/rcfg"
)

Expand Down Expand Up @@ -337,7 +338,9 @@ var (
ScrollChainAddress: common.HexToAddress("0x2D567EcE699Eabe5afCd141eDB7A4f2D0D6ce8a0"),
},
DAConfig: &DAConfig{
BlobScanAPIEndpoint: "https://api.sepolia.blobscan.com/blobs/",
BlobScanAPIEndpoint: "https://api.sepolia.blobscan.com/blobs/",
MissingHeaderFieldsURL: "", // TODO: Add missing header fields URL and correct checksum
MissingHeaderFieldsSHA256: missing_header_fields.SHA256ChecksumFromHex("0xdcdae1c92c59c307edae24216eb06c4566f512739aec39dca1abd53c597102c7"),
},
},
}
Expand Down Expand Up @@ -379,8 +382,10 @@ var (
ScrollChainAddress: common.HexToAddress("0xa13BAF47339d63B743e7Da8741db5456DAc1E556"),
},
DAConfig: &DAConfig{
BlobScanAPIEndpoint: "https://api.blobscan.com/blobs/",
BlockNativeAPIEndpoint: "https://api.ethernow.xyz/v1/blob/",
BlobScanAPIEndpoint: "https://api.blobscan.com/blobs/",
BlockNativeAPIEndpoint: "https://api.ethernow.xyz/v1/blob/",
MissingHeaderFieldsURL: "", // TODO: Add missing header fields URL and correct checksum
MissingHeaderFieldsSHA256: missing_header_fields.SHA256ChecksumFromHex("0xdcdae1c92c59c307edae24216eb06c4566f512739aec39dca1abd53c597102c7"),
},
},
}
Expand Down Expand Up @@ -675,6 +680,10 @@ type L1Config struct {
type DAConfig struct {
BlobScanAPIEndpoint string `json:"blobScanApiEndpoint,omitempty"`
BlockNativeAPIEndpoint string `json:"blockNativeApiEndpoint,omitempty"`
// MissingHeaderFieldsURL is the URL to fetch the historical missing header fields to a file.
MissingHeaderFieldsURL string `json:"missingHeaderFieldsURL,omitempty"`
// MissingHeaderFieldsSHA256 is the SHA256 hash of the file containing the historical missing header fields.
MissingHeaderFieldsSHA256 missing_header_fields.SHA256Checksum `json:"missingHeaderFieldsSHA256,omitempty"`
}

func (c *L1Config) String() string {
Expand Down
37 changes: 17 additions & 20 deletions rollup/da_syncer/batch_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,28 @@ import (

"github.com/scroll-tech/go-ethereum/core/rawdb"
"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/da"
)

type BatchQueue struct {
// batches is map from batchIndex to batch blocks
batches map[uint64]DAEntry
batches map[uint64]da.Entry
DAQueue *DAQueue
db ethdb.Database
lastFinalizedBatchIndex uint64
}

func NewBatchQueue(DAQueue *DAQueue, db ethdb.Database) *BatchQueue {
return &BatchQueue{
batches: make(map[uint64]DAEntry),
batches: make(map[uint64]da.Entry),
DAQueue: DAQueue,
db: db,
lastFinalizedBatchIndex: 0,
}
}

// NextBatch finds next finalized batch and returns data, that was committed in that batch
func (bq *BatchQueue) NextBatch(ctx context.Context) (DAEntry, error) {
func (bq *BatchQueue) NextBatch(ctx context.Context) (da.Entry, error) {
if batch, ok := bq.getFinalizedBatch(); ok {
return batch, nil
}
Expand All @@ -36,18 +37,14 @@ func (bq *BatchQueue) NextBatch(ctx context.Context) (DAEntry, error) {
if err != nil {
return nil, err
}
switch daEntry := daEntry.(type) {
case *CommitBatchDAV0:
bq.batches[daEntry.BatchIndex] = daEntry
case *CommitBatchDAV1:
bq.batches[daEntry.BatchIndex] = daEntry
case *CommitBatchDAV2:
bq.batches[daEntry.BatchIndex] = daEntry
case *RevertBatchDA:
bq.deleteBatch(daEntry.BatchIndex)
case *FinalizeBatchDA:
if daEntry.BatchIndex > bq.lastFinalizedBatchIndex {
bq.lastFinalizedBatchIndex = daEntry.BatchIndex
switch daEntry.Type() {
case da.CommitBatchV0Type, da.CommitBatchV1Type, da.CommitBatchV2Type:
bq.batches[daEntry.BatchIndex()] = daEntry
case da.RevertBatchType:
bq.deleteBatch(daEntry.BatchIndex())
case da.FinalizeBatchType:
if daEntry.BatchIndex() > bq.lastFinalizedBatchIndex {
bq.lastFinalizedBatchIndex = daEntry.BatchIndex()
}
ret, ok := bq.getFinalizedBatch()
if ok {
Expand All @@ -62,7 +59,7 @@ func (bq *BatchQueue) NextBatch(ctx context.Context) (DAEntry, error) {
}

// getFinalizedBatch returns next finalized batch if there is available
func (bq *BatchQueue) getFinalizedBatch() (DAEntry, bool) {
func (bq *BatchQueue) getFinalizedBatch() (da.Entry, bool) {
if len(bq.batches) == 0 {
return nil, false
}
Expand Down Expand Up @@ -93,7 +90,7 @@ func (bq *BatchQueue) deleteBatch(batchIndex uint64) {
if !ok {
return
}
curBatchL1Height := batch.GetL1BlockNumber()
curBatchL1Height := batch.L1BlockNumber()
delete(bq.batches, batchIndex)
if len(bq.batches) == 0 {
rawdb.WriteDASyncedL1BlockNumber(bq.db, curBatchL1Height)
Expand All @@ -102,10 +99,10 @@ func (bq *BatchQueue) deleteBatch(batchIndex uint64) {
// we store here min height of currently loaded batches to be able to start syncing from the same place in case of restart
var minBatchL1Height uint64 = math.MaxUint64
for _, val := range bq.batches {
if val.GetL1BlockNumber() < minBatchL1Height {
minBatchL1Height = val.GetL1BlockNumber()
if val.L1BlockNumber() < minBatchL1Height {
minBatchL1Height = val.L1BlockNumber()
}
}
rawdb.WriteDASyncedL1BlockNumber(bq.db, minBatchL1Height-1)

rawdb.WriteDASyncedL1BlockNumber(bq.db, minBatchL1Height-1)
}
Loading
Loading