Skip to content

Commit

Permalink
bsc: incrementally calc and store all old snapshots. slower initial s…
Browse files Browse the repository at this point in the history
…tage_snapshots, but faster restart during initial sync (erigontech#6457)
  • Loading branch information
AskAlexSharov authored Dec 30, 2022
1 parent c8aead7 commit b0af04f
Show file tree
Hide file tree
Showing 19 changed files with 121 additions and 83 deletions.
27 changes: 15 additions & 12 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,14 +478,17 @@ func stageHeaders(db kv.RwDB, ctx context.Context) error {
defer sn.Close()
defer agg.Close()
br := getBlockReader(db)
engine, _, _, _, _ := newSync(ctx, db, nil)
chainConfig, _, _ := fromdb.ChainConfig(db), kvcfg.HistoryV3.FromDB(db), fromdb.PruneMode(db)

return db.Update(ctx, func(tx kv.RwTx) error {
if !(unwind > 0 || reset) {
log.Info("This command only works with --unwind or --reset options")
}

if reset {
dirs := datadir.New(datadirCli)
if err := reset2.ResetBlocks(tx, db, sn, br, dirs.Tmp); err != nil {
if err := reset2.ResetBlocks(tx, db, sn, br, dirs, *chainConfig, engine); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -1242,7 +1245,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)
panic(err)
}

sync, err := stages2.NewStagedSync(context.Background(), db, p2p.Config{}, &cfg, sentryControlServer, &shards.Notifications{}, nil, allSn, agg, nil)
sync, err := stages2.NewStagedSync(context.Background(), db, p2p.Config{}, &cfg, sentryControlServer, &shards.Notifications{}, nil, allSn, agg, nil, engine)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -1303,27 +1306,27 @@ func overrideStorageMode(db kv.RwDB) error {
})
}

func initConsensusEngine(chainConfig *params.ChainConfig, datadir string, db kv.RwDB) (engine consensus.Engine) {
logger := log.New()
func initConsensusEngine(cc *params.ChainConfig, datadir string, db kv.RwDB) (engine consensus.Engine) {
l := log.New()
snapshots, _ := allSnapshots(context.Background(), db)
config := ethconfig.Defaults

switch {
case chainConfig.Clique != nil:
case cc.Clique != nil:
c := params.CliqueSnapshot
c.DBPath = filepath.Join(datadir, "clique", "db")
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, c, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, snapshots, db.ReadOnly(), db)
case chainConfig.Aura != nil:
engine = ethconsensusconfig.CreateConsensusEngine(cc, l, c, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, snapshots, db.ReadOnly(), db)
case cc.Aura != nil:
consensusConfig := &params.AuRaConfig{DBPath: filepath.Join(datadir, "aura")}
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, snapshots, db.ReadOnly(), db)
case chainConfig.Parlia != nil:
engine = ethconsensusconfig.CreateConsensusEngine(cc, l, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, snapshots, db.ReadOnly(), db)
case cc.Parlia != nil:
// Apply special hacks for BSC params
params.ApplyBinanceSmartChainParams()
consensusConfig := &params.ParliaConfig{DBPath: filepath.Join(datadir, "parlia")}
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, snapshots, db.ReadOnly(), db)
case chainConfig.Bor != nil:
engine = ethconsensusconfig.CreateConsensusEngine(cc, l, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, snapshots, db.ReadOnly(), db)
case cc.Bor != nil:
consensusConfig := &config.Bor
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, HeimdallURL, false, datadir, snapshots, db.ReadOnly(), db)
engine = ethconsensusconfig.CreateConsensusEngine(cc, l, consensusConfig, config.Miner.Notify, config.Miner.Noverify, HeimdallURL, false, datadir, snapshots, db.ReadOnly(), db)
default: //ethash
engine = ethash.NewFaker()
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,9 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to execution service privateApi: %w", err)
}

kvClient := remote.NewKVClient(conn)
remoteBackendClient := remote.NewETHBACKENDClient(conn)
remoteKv, err := remotedb.NewRemote(gointerfaces.VersionFromProto(remotedbserver.KvServiceAPIVersion), logger, kvClient).Open()
remoteKvClient := remote.NewKVClient(conn)
remoteKv, err := remotedb.NewRemote(gointerfaces.VersionFromProto(remotedbserver.KvServiceAPIVersion), logger, remoteKvClient).Open()
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to remoteKv: %w", err)
}
Expand Down Expand Up @@ -339,7 +339,7 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
if cfg.Snap.Enabled {
allSnapshots = snapshotsync.NewRoSnapshots(cfg.Snap, cfg.Dirs.Snap)
// To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down
// Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `kvClient.Snapshots` after establish grpc connection
// Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection
allSnapshots.OptimisticReopenWithDB(db)
allSnapshots.LogStat()

Expand All @@ -357,7 +357,7 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
})
onNewSnapshot = func() {
go func() { // don't block events processing by network communication
reply, err := kvClient.Snapshots(ctx, &remote.SnapshotsRequest{}, grpc.WaitForReady(true))
reply, err := remoteKvClient.Snapshots(ctx, &remote.SnapshotsRequest{}, grpc.WaitForReady(true))
if err != nil {
log.Warn("[Snapshots] reopen", "err", err)
return
Expand Down Expand Up @@ -431,7 +431,7 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
log.Info("if you run RPCDaemon on same machine with Erigon add --datadir option")
}

subscribeToStateChangesLoop(ctx, kvClient, stateCache)
subscribeToStateChangesLoop(ctx, remoteKvClient, stateCache)

txpoolConn := conn
if cfg.TxPoolApiAddr != cfg.PrivateApiAddr {
Expand Down
2 changes: 1 addition & 1 deletion cmd/state/commands/check_change_sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func CheckChangeSets(genesis *core.Genesis, logger log.Logger, blockNum uint64,
commitEvery := time.NewTicker(30 * time.Second)
defer commitEvery.Stop()

engine := initConsensusEngine(chainConfig, logger, allSnapshots)
engine := initConsensusEngine(chainConfig, allSnapshots)

for !interrupt {

Expand Down
13 changes: 7 additions & 6 deletions cmd/state/commands/erigon2.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func Erigon2(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log.
} else {
blockReader = snapshotsync.NewBlockReader()
}
engine := initConsensusEngine(chainConfig, logger, allSnapshots)
engine := initConsensusEngine(chainConfig, allSnapshots)

for !interrupt {
blockNum++
Expand Down Expand Up @@ -624,25 +624,26 @@ func (ww *WriterWrapper) CreateContract(address common.Address) error {
return nil
}

func initConsensusEngine(chainConfig *params.ChainConfig, logger log.Logger, snapshots *snapshotsync.RoSnapshots) (engine consensus.Engine) {
func initConsensusEngine(chainConfig *params.ChainConfig, snapshots *snapshotsync.RoSnapshots) (engine consensus.Engine) {
l := log.New()
config := ethconfig.Defaults

switch {
case chainConfig.Clique != nil:
c := params.CliqueSnapshot
c.DBPath = filepath.Join(datadirCli, "clique", "db")
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, c, config.Miner.Notify, config.Miner.Noverify, "", true, datadirCli, snapshots, true /* readonly */)
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, l, c, config.Miner.Notify, config.Miner.Noverify, "", true, datadirCli, snapshots, true /* readonly */)
case chainConfig.Aura != nil:
consensusConfig := &params.AuRaConfig{DBPath: filepath.Join(datadirCli, "aura")}
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadirCli, snapshots, true /* readonly */)
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, l, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadirCli, snapshots, true /* readonly */)
case chainConfig.Parlia != nil:
// Apply special hacks for BSC params
params.ApplyBinanceSmartChainParams()
consensusConfig := &params.ParliaConfig{DBPath: filepath.Join(datadirCli, "parlia")}
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadirCli, snapshots, true /* readonly */)
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, l, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadirCli, snapshots, true /* readonly */)
case chainConfig.Bor != nil:
consensusConfig := &config.Bor
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, config.HeimdallURL, false, datadirCli, snapshots, true /* readonly */)
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, l, consensusConfig, config.Miner.Notify, config.Miner.Noverify, config.HeimdallURL, false, datadirCli, snapshots, true /* readonly */)
default: //ethash
engine = ethash.NewFaker()
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/state/commands/erigon4.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func Erigon4(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log.
return fmt.Errorf("reopen snapshot segments: %w", err)
}
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
engine := initConsensusEngine(chainConfig, logger, allSnapshots)
engine := initConsensusEngine(chainConfig, allSnapshots)

getHeader := func(hash common.Hash, number uint64) *types.Header {
h, err := blockReader.Header(ctx, historyTx, hash, number)
Expand Down
2 changes: 1 addition & 1 deletion cmd/state/exec3/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (rw *Worker) RunTxTask(txTask *exec22.TxTask) {
return core.SysCallContract(contract, data, *rw.chainConfig, ibs, header, rw.engine, false /* constCall */)
}

if _, _, err := rw.engine.Finalize(rw.chainConfig, header, ibs, txTask.Txs, txTask.Uncles, nil /* receipts */, txTask.Withdrawals, rw.epoch, rw.chain, syscall); err != nil {
if _, _, err := rw.engine.Finalize(rw.chainConfig, types.CopyHeader(header), ibs, txTask.Txs, txTask.Uncles, nil /* receipts */, txTask.Withdrawals, rw.epoch, rw.chain, syscall); err != nil {
//fmt.Printf("error=%v\n", err)
txTask.Error = err
} else {
Expand Down
3 changes: 2 additions & 1 deletion cmd/state/exec3/state_recon.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/state"
"github.com/ledgerwatch/erigon/core/systemcontracts"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/core/types/accounts"
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/core/vm/evmtypes"
Expand Down Expand Up @@ -310,7 +311,7 @@ func (rw *ReconWorker) runTxTask(txTask *exec22.TxTask) {
syscall := func(contract common.Address, data []byte) ([]byte, error) {
return core.SysCallContract(contract, data, *rw.chainConfig, ibs, txTask.Header, rw.engine, false /* constCall */)
}
if _, _, err := rw.engine.Finalize(rw.chainConfig, txTask.Header, ibs, txTask.Txs, txTask.Uncles, nil /* receipts */, txTask.Withdrawals, rw.epoch, rw.chain, syscall); err != nil {
if _, _, err := rw.engine.Finalize(rw.chainConfig, types.CopyHeader(txTask.Header), ibs, txTask.Txs, txTask.Uncles, nil /* receipts */, txTask.Withdrawals, rw.epoch, rw.chain, syscall); err != nil {
if _, readError := rw.stateReader.ReadError(); !readError {
panic(fmt.Errorf("finalize of block %d failed: %w", txTask.BlockNum, err))
}
Expand Down
3 changes: 2 additions & 1 deletion consensus/clique/clique.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math/big"
"math/rand"
Expand Down Expand Up @@ -423,7 +424,7 @@ func (c *Clique) Seal(chain consensus.ChainHeaderReader, block *types.Block, res
return err
}
if _, authorized := snap.Signers[signer]; !authorized {
return ErrUnauthorizedSigner
return fmt.Errorf("Clique.Seal: %w", ErrUnauthorizedSigner)
}
// If we're amongst the recent signers, wait for the next block
for seen, recent := range snap.Recents {
Expand Down
41 changes: 21 additions & 20 deletions consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
inMemorySnapshots = 128 // Number of recent snapshots to keep in memory
inMemorySignatures = 4096 // Number of recent block signatures to keep in memory

checkpointInterval = 1024 // Number of blocks after which to save the snapshot to the database
CheckpointInterval = 1024 // Number of blocks after which to save the snapshot to the database
defaultEpochLength = uint64(100) // Default number of blocks of checkpoint to update validatorSet from contract

extraVanity = 32 // Fixed number of extra-data prefix bytes reserved for signer vanity
Expand Down Expand Up @@ -342,7 +342,7 @@ func (p *Parlia) verifyHeader(chain consensus.ChainHeaderReader, header *types.H

// Don't waste time checking blocks from the future
if header.Time > uint64(time.Now().Unix()) {
return consensus.ErrFutureBlock
return fmt.Errorf("header %d, time %d, now %d, %w", header.Number.Uint64(), header.Time, time.Now().Unix(), consensus.ErrFutureBlock)
}
// Check that the extra-data contains the vanity, validators and signature.
if len(header.Extra) < extraVanity {
Expand Down Expand Up @@ -475,7 +475,7 @@ func (p *Parlia) verifySeal(chain consensus.ChainHeaderReader, header *types.Hea
}

if _, ok := snap.Validators[signer]; !ok {
return errUnauthorizedValidator
return fmt.Errorf("parlia.verifySeal: headerNum=%d, validator=%x, %w", header.Number.Uint64(), signer.Bytes(), errUnauthorizedValidator)
}

for seen, recent := range snap.Recents {
Expand Down Expand Up @@ -526,7 +526,7 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
}

// If an on-disk checkpoint snapshot can be found, use that
if number%checkpointInterval == 0 {
if number%CheckpointInterval == 0 {
if s, err := loadSnapshot(p.config, p.signatures, p.db, number, hash); err == nil {
//log.Trace("Loaded snapshot from disk", "number", number, "hash", hash)
snap = s
Expand All @@ -535,21 +535,22 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
}
}
}
if (verify && number%p.config.Epoch == 0) || number == 0 {
if (p.snapshots != nil && number <= p.snapshots.BlocksAvailable()) || number == 0 {
// Headers included into the snapshots have to be trusted as checkpoints
checkpoint := chain.GetHeader(hash, number)
if checkpoint != nil {
validatorBytes := checkpoint.Extra[extraVanity : len(checkpoint.Extra)-extraSeal]
// get validators from headers
validators, err := ParseValidators(validatorBytes)
if err != nil {
return nil, err
}
// new snapshot
snap = newSnapshot(p.config, p.signatures, number, hash, validators)
break
if number == 0 {
// Headers included into the snapshots have to be trusted as checkpoints
checkpoint := chain.GetHeader(hash, number)
if checkpoint != nil {
validatorBytes := checkpoint.Extra[extraVanity : len(checkpoint.Extra)-extraSeal]
// get validators from headers
validators, err := ParseValidators(validatorBytes)
if err != nil {
return nil, err
}
// new snapshot
snap = newSnapshot(p.config, p.signatures, number, hash, validators)
if err := snap.store(p.db); err != nil {
return nil, err
}
break
}
}

Expand Down Expand Up @@ -592,7 +593,7 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
p.recentSnaps.Add(snap.Hash, snap)

// If we've generated a new checkpoint snapshot, save to disk
if verify && snap.Number%checkpointInterval == 0 && len(headers) > 0 {
if verify && snap.Number%CheckpointInterval == 0 && len(headers) > 0 {
if err = snap.store(p.db); err != nil {
return nil, err
}
Expand Down Expand Up @@ -841,7 +842,7 @@ func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, res

// Bail out if we're unauthorized to sign a block
if _, authorized := snap.Validators[val]; !authorized {
return errUnauthorizedValidator
return fmt.Errorf("parlia.Seal: %w", errUnauthorizedValidator)
}

// If we're amongst the recent signers, wait for the next block
Expand Down
3 changes: 2 additions & 1 deletion consensus/parlia/ramanujanfork.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package parlia

import (
"fmt"
"math/rand"
"time"

Expand Down Expand Up @@ -37,7 +38,7 @@ func (p *Parlia) blockTimeForRamanujanFork(snap *Snapshot, header, parent *types
func (p *Parlia) blockTimeVerifyForRamanujanFork(snap *Snapshot, header, parent *types.Header) error {
if p.chainConfig.IsRamanujan(header.Number.Uint64()) {
if header.Time < parent.Time+p.config.Period+backOffTime(snap, header.Coinbase) {
return consensus.ErrFutureBlock
return fmt.Errorf("header %d, time %d, now %d, period: %d, backof: %d, %w", header.Number.Uint64(), header.Time, time.Now().Unix(), p.config.Period, backOffTime(snap, header.Coinbase), consensus.ErrFutureBlock)
}
}
return nil
Expand Down
28 changes: 12 additions & 16 deletions consensus/parlia/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,21 @@ package parlia
import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"math/big"
"sort"

"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"

lru "github.com/hashicorp/golang-lru"

common2 "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/log/v3"
)

// Snapshot is the state of the validatorSet at a given point.
Expand Down Expand Up @@ -81,20 +80,13 @@ func (s validatorsAscending) Len() int { return len(s) }
func (s validatorsAscending) Less(i, j int) bool { return bytes.Compare(s[i][:], s[j][:]) < 0 }
func (s validatorsAscending) Swap(i, j int) { s[i], s[j] = s[j], s[i] }

const NumberLength = 8

// EncodeBlockNumber encodes a block number as big endian uint64
func EncodeBlockNumber(number uint64) []byte {
enc := make([]byte, NumberLength)
binary.BigEndian.PutUint64(enc, number)
return enc
}

// SnapshotFullKey = SnapshotBucket + num (uint64 big endian) + hash
func SnapshotFullKey(number uint64, hash common.Hash) []byte {
return append(EncodeBlockNumber(number), hash.Bytes()...)
return append(common2.EncodeTs(number), hash.Bytes()...)
}

var ErrNoSnapsnot = fmt.Errorf("no parlia snapshot")

// loadSnapshot loads an existing snapshot from the database.
func loadSnapshot(config *params.ParliaConfig, sigCache *lru.ARCCache, db kv.RwDB, num uint64, hash common.Hash) (*Snapshot, error) {
tx, err := db.BeginRo(context.Background())
Expand All @@ -106,6 +98,10 @@ func loadSnapshot(config *params.ParliaConfig, sigCache *lru.ARCCache, db kv.RwD
if err != nil {
return nil, err
}

if len(blob) == 0 {
return nil, ErrNoSnapsnot
}
snap := new(Snapshot)
if err := json.Unmarshal(blob, snap); err != nil {
return nil, err
Expand All @@ -121,7 +117,7 @@ func (s *Snapshot) store(db kv.RwDB) error {
if err != nil {
return err
}
return db.Update(context.Background(), func(tx kv.RwTx) error {
return db.UpdateAsync(context.Background(), func(tx kv.RwTx) error {
return tx.Put(kv.ParliaSnapshot, SnapshotFullKey(s.Number, s.Hash), blob)
})
}
Expand Down
Loading

0 comments on commit b0af04f

Please sign in to comment.