Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/downloader: implement beacon sync #23982

Merged
merged 22 commits into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
0f78c4f
eth/downloader: implement beacon sync
karalabe Sep 30, 2021
0d22561
eth/downloader: fix a crash if the beacon chain is reduced in length
karalabe Jan 11, 2022
786308d
eth/downloader: fix beacon sync start/stop thrashing data race
karalabe Jan 11, 2022
49b7771
eth/downloader: use a non-nil pivot even in degenerate sync requests
karalabe Jan 11, 2022
3f16180
eth/downloader: don't touch internal state on beacon Head retrieval
karalabe Jan 11, 2022
1c5a698
eth/downloader: fix spelling mistakes
karalabe Jan 14, 2022
344d81a
eth/downloader: fix some typos
karalabe Jan 18, 2022
944e050
eth: integrate legacy/beacon sync switchover and UX
karalabe Nov 2, 2021
0a50767
eth: handle UX wise being stuck on post-merge TTD
karalabe Feb 1, 2022
f6a427d
core, eth: integrate the beacon client with the beacon sync
karalabe Feb 2, 2022
ba9e91f
eth/catalyst: make some warning messages nicer
karalabe Feb 2, 2022
ef4d465
eth/downloader: remove Ethereum 1&2 notions in favor of merge
karalabe Feb 2, 2022
92020c9
core/beacon, eth: clean up engine API returns a bit
karalabe Feb 3, 2022
46c0e3f
eth/downloader: add skeleton extension tests
karalabe Feb 3, 2022
336b5df
eth/catalyst: keep non-kiln spec, handle mining on ttd
karalabe Feb 3, 2022
38114ad
eth/downloader: add beacon header retrieval tests
karalabe Feb 23, 2022
71861e7
eth: fixed spelling, commented failing tests out
MariusVanDerWijden Mar 7, 2022
7ac324b
eth/downloader: review fixes
karalabe Mar 9, 2022
4f2c4e9
eth/downloader: drop peers failing to deliver beacon headers
karalabe Mar 11, 2022
2cb92eb
core/rawdb: track beacon sync data in db inspect
karalabe Mar 11, 2022
bec4fd6
eth: fix review concerns
karalabe Mar 11, 2022
7c12053
internal/web3ext: nit
karalabe Mar 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions core/beacon/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,25 @@ package beacon
import "github.com/ethereum/go-ethereum/rpc"

var (
VALID = GenericStringResponse{"VALID"}

This comment was marked as spam.

This comment was marked as spam.

SUCCESS = GenericStringResponse{"SUCCESS"}
INVALID = ForkChoiceResponse{Status: "INVALID", PayloadID: nil}

This comment was marked as spam.

SYNCING = ForkChoiceResponse{Status: "SYNCING", PayloadID: nil}
// VALID is returned by the engine API in the following calls:
// - newPayloadV1: if the payload was already known or was just validated and executed
// - forkchoiceUpdateV1: if the chain accepted the reorg (might ignore if it's stale)

This comment was marked as spam.

VALID = "VALID"

This comment was marked as spam.


// INVALID is returned by the engine API in the following calls:
// - newPayloadV1: if the payload failed to execute on top of the local chain
// - forkchoiceUpdateV1: if the new head is unknown, pre-merge, or reorg to it fails
INVALID = "INVALID"

// SYNCING is returned by the engine API in the following calls:
// - newPayloadV1: if the payload was accepted on top of an active sync
// - forkchoiceUpdateV1: if the new head was seen before, but not part of the chain
SYNCING = "SYNCING"

// ACCEPTED is returned by the engine API in the following calls:
// - newPayloadV1: if the payload was accepted, but not processed (side chain)
ACCEPTED = "ACCEPTED"

GenericServerError = rpc.CustomError{Code: -32000, ValidationError: "Server error"}
UnknownPayload = rpc.CustomError{Code: -32001, ValidationError: "Unknown payload"}
InvalidTB = rpc.CustomError{Code: -32002, ValidationError: "Invalid terminal block"}
Expand Down
12 changes: 0 additions & 12 deletions core/beacon/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,6 @@ type executableDataMarshaling struct {
Transactions []hexutil.Bytes
}

type NewBlockResponse struct {
Valid bool `json:"valid"`
}

type GenericResponse struct {
Success bool `json:"success"`
}

type GenericStringResponse struct {
Status string `json:"status"`
}

type ExecutePayloadResponse struct {
Status string `json:"status"`
LatestValidHash common.Hash `json:"latestValidHash"`
Expand Down
44 changes: 27 additions & 17 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1646,12 +1646,16 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits)
blockInsertTimer.UpdateSince(start)

// Report the import stats before returning the various results
stats.processed++
stats.usedGas += usedGas

dirty, _ := bc.stateCache.TrieDB().Size()
stats.report(chain, it.index, dirty, setHead)

if !setHead {
// We did not setHead, so we don't have any stats to update
log.Info("Inserted block", "number", block.Number(), "hash", block.Hash(), "txs", len(block.Transactions()), "elapsed", common.PrettyDuration(time.Since(start)))
return it.index, nil
return it.index, nil // Direct block insertion of a single block
}

switch status {
case CanonStatTy:
log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(),
Expand All @@ -1678,11 +1682,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
"txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()),
"root", block.Root())
}
stats.processed++
stats.usedGas += usedGas

dirty, _ := bc.stateCache.TrieDB().Size()
stats.report(chain, it.index, dirty)
}

// Any blocks remaining here? The only ones we care about are the future ones
Expand Down Expand Up @@ -2079,28 +2078,39 @@ func (bc *BlockChain) InsertBlockWithoutSetHead(block *types.Block) error {
// block. It's possible that after the reorg the relevant state of head
// is missing. It can be fixed by inserting a new block which triggers
// the re-execution.
func (bc *BlockChain) SetChainHead(newBlock *types.Block) error {
func (bc *BlockChain) SetChainHead(head *types.Block) error {
if !bc.chainmu.TryLock() {
return errChainStopped
}
defer bc.chainmu.Unlock()

// Run the reorg if necessary and set the given block as new head.
if newBlock.ParentHash() != bc.CurrentBlock().Hash() {
if err := bc.reorg(bc.CurrentBlock(), newBlock); err != nil {
start := time.Now()
if head.ParentHash() != bc.CurrentBlock().Hash() {
if err := bc.reorg(bc.CurrentBlock(), head); err != nil {
return err
}
}
bc.writeHeadBlock(newBlock)
bc.writeHeadBlock(head)

// Emit events
logs := bc.collectLogs(newBlock.Hash(), false)
bc.chainFeed.Send(ChainEvent{Block: newBlock, Hash: newBlock.Hash(), Logs: logs})
logs := bc.collectLogs(head.Hash(), false)
bc.chainFeed.Send(ChainEvent{Block: head, Hash: head.Hash(), Logs: logs})
if len(logs) > 0 {
bc.logsFeed.Send(logs)
}
bc.chainHeadFeed.Send(ChainHeadEvent{Block: newBlock})
log.Info("Set the chain head", "number", newBlock.Number(), "hash", newBlock.Hash())
bc.chainHeadFeed.Send(ChainHeadEvent{Block: head})

context := []interface{}{
"number", head.Number(),
"hash", head.Hash(),
"root", head.Root(),
"elapsed", time.Since(start),
}
if timestamp := time.Unix(int64(head.Time()), 0); time.Since(timestamp) > time.Minute {
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
}
log.Info("Chain head was updated", context...)
return nil
}

Expand Down
9 changes: 6 additions & 3 deletions core/blockchain_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const statsReportLimit = 8 * time.Second

// report prints statistics if some number of blocks have been processed
// or more than a few seconds have passed since the last message.
func (st *insertStats) report(chain []*types.Block, index int, dirty common.StorageSize) {
func (st *insertStats) report(chain []*types.Block, index int, dirty common.StorageSize, setHead bool) {
// Fetch the timings for the batch
var (
now = mclock.Now()
Expand Down Expand Up @@ -71,8 +71,11 @@ func (st *insertStats) report(chain []*types.Block, index int, dirty common.Stor
if st.ignored > 0 {
context = append(context, []interface{}{"ignored", st.ignored}...)
}
log.Info("Imported new chain segment", context...)

if setHead {
log.Info("Imported new chain segment", context...)
} else {
log.Info("Imported new potential chain segment", context...)
}
// Bump the stats reported to the next section
*st = insertStats{startTime: now, lastIndex: index + 1}
}
Expand Down
80 changes: 80 additions & 0 deletions core/rawdb/accessors_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package rawdb

import (
"bytes"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)

// ReadSkeletonSyncStatus retrieves the serialized sync status saved at shutdown.
func ReadSkeletonSyncStatus(db ethdb.KeyValueReader) []byte {
data, _ := db.Get(skeletonSyncStatusKey)
return data
}

// WriteSkeletonSyncStatus stores the serialized sync status to save at shutdown.
func WriteSkeletonSyncStatus(db ethdb.KeyValueWriter, status []byte) {
if err := db.Put(skeletonSyncStatusKey, status); err != nil {
log.Crit("Failed to store skeleton sync status", "err", err)
}
}

// DeleteSkeletonSyncStatus deletes the serialized sync status saved at the last
// shutdown
func DeleteSkeletonSyncStatus(db ethdb.KeyValueWriter) {
if err := db.Delete(skeletonSyncStatusKey); err != nil {
log.Crit("Failed to remove skeleton sync status", "err", err)
}
}

// ReadSkeletonHeader retrieves a block header from the skeleton sync store,
func ReadSkeletonHeader(db ethdb.KeyValueReader, number uint64) *types.Header {
data, _ := db.Get(skeletonHeaderKey(number))
if len(data) == 0 {
return nil
}
header := new(types.Header)
if err := rlp.Decode(bytes.NewReader(data), header); err != nil {
log.Error("Invalid skeleton header RLP", "number", number, "err", err)
return nil
}
return header
}

// WriteSkeletonHeader stores a block header into the skeleton sync store.
func WriteSkeletonHeader(db ethdb.KeyValueWriter, header *types.Header) {
data, err := rlp.EncodeToBytes(header)
if err != nil {
log.Crit("Failed to RLP encode header", "err", err)
}
key := skeletonHeaderKey(header.Number.Uint64())
if err := db.Put(key, data); err != nil {
log.Crit("Failed to store skeleton header", "err", err)
}
}

// DeleteSkeletonHeader removes all block header data associated with a hash.
func DeleteSkeletonHeader(db ethdb.KeyValueWriter, number uint64) {
if err := db.Delete(skeletonHeaderKey(number)); err != nil {
log.Crit("Failed to delete skeleton header", "err", err)
}
}
6 changes: 5 additions & 1 deletion core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
storageSnaps stat
preimages stat
bloomBits stat
beaconHeaders stat
cliqueSnaps stat

// Ancient store statistics
Expand Down Expand Up @@ -379,6 +380,8 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
bloomBits.Add(size)
case bytes.HasPrefix(key, BloomBitsIndexPrefix):
bloomBits.Add(size)
case bytes.HasPrefix(key, skeletonHeaderPrefix) && len(key) == (len(skeletonHeaderPrefix)+8):
beaconHeaders.Add(size)
case bytes.HasPrefix(key, []byte("clique-")) && len(key) == 7+common.HashLength:
cliqueSnaps.Add(size)
case bytes.HasPrefix(key, []byte("cht-")) ||
Expand All @@ -395,7 +398,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
databaseVersionKey, headHeaderKey, headBlockKey, headFastBlockKey, lastPivotKey,
fastTrieProgressKey, snapshotDisabledKey, SnapshotRootKey, snapshotJournalKey,
snapshotGeneratorKey, snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey,
uncleanShutdownKey, badBlockKey, transitionStatusKey,
uncleanShutdownKey, badBlockKey, transitionStatusKey, skeletonSyncStatusKey,
} {
if bytes.Equal(key, meta) {
metadata.Add(size)
Expand Down Expand Up @@ -441,6 +444,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
{"Key-Value store", "Trie preimages", preimages.Size(), preimages.Count()},
{"Key-Value store", "Account snapshot", accountSnaps.Size(), accountSnaps.Count()},
{"Key-Value store", "Storage snapshot", storageSnaps.Size(), storageSnaps.Count()},
{"Key-Value store", "Beacon sync headers", beaconHeaders.Size(), beaconHeaders.Count()},
{"Key-Value store", "Clique snapshots", cliqueSnaps.Size(), cliqueSnaps.Count()},
{"Key-Value store", "Singleton metadata", metadata.Size(), metadata.Count()},
{"Ancient store", "Headers", ancientHeadersSize.String(), ancients.String()},
Expand Down
9 changes: 9 additions & 0 deletions core/rawdb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ var (
// snapshotSyncStatusKey tracks the snapshot sync status across restarts.
snapshotSyncStatusKey = []byte("SnapshotSyncStatus")

// skeletonSyncStatusKey tracks the skeleton sync status across restarts.
skeletonSyncStatusKey = []byte("SkeletonSyncStatus")

// txIndexTailKey tracks the oldest block whose transactions have been indexed.
txIndexTailKey = []byte("TransactionIndexTail")

Expand Down Expand Up @@ -92,6 +95,7 @@ var (
SnapshotAccountPrefix = []byte("a") // SnapshotAccountPrefix + account hash -> account trie value
SnapshotStoragePrefix = []byte("o") // SnapshotStoragePrefix + account hash + storage hash -> storage trie value
CodePrefix = []byte("c") // CodePrefix + code hash -> account code
skeletonHeaderPrefix = []byte("S") // skeletonHeaderPrefix + num (uint64 big endian) -> header
karalabe marked this conversation as resolved.
Show resolved Hide resolved

PreimagePrefix = []byte("secure-key-") // PreimagePrefix + hash -> preimage
configPrefix = []byte("ethereum-config-") // config prefix for the db
Expand Down Expand Up @@ -210,6 +214,11 @@ func bloomBitsKey(bit uint, section uint64, hash common.Hash) []byte {
return key
}

// skeletonHeaderKey = skeletonHeaderPrefix + num (uint64 big endian)
func skeletonHeaderKey(number uint64) []byte {
return append(skeletonHeaderPrefix, encodeBlockNumber(number)...)
}

// preimageKey = PreimagePrefix + hash
func preimageKey(hash common.Hash) []byte {
return append(PreimagePrefix, hash.Bytes()...)
Expand Down
Loading