diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 38f6a1f57e0..f2f77e30dd1 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -478,6 +478,9 @@ func stageHeaders(db kv.RwDB, ctx context.Context) error { defer sn.Close() defer agg.Close() br := getBlockReader(db) + engine, _, _, _, _ := newSync(ctx, db, nil) + chainConfig, _, _ := fromdb.ChainConfig(db), kvcfg.HistoryV3.FromDB(db), fromdb.PruneMode(db) + return db.Update(ctx, func(tx kv.RwTx) error { if !(unwind > 0 || reset) { log.Info("This command only works with --unwind or --reset options") @@ -485,7 +488,7 @@ func stageHeaders(db kv.RwDB, ctx context.Context) error { if reset { dirs := datadir.New(datadirCli) - if err := reset2.ResetBlocks(tx, db, sn, br, dirs.Tmp); err != nil { + if err := reset2.ResetBlocks(tx, db, sn, br, dirs, *chainConfig, engine); err != nil { return err } return nil @@ -1242,7 +1245,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig) panic(err) } - sync, err := stages2.NewStagedSync(context.Background(), db, p2p.Config{}, &cfg, sentryControlServer, &shards.Notifications{}, nil, allSn, agg, nil) + sync, err := stages2.NewStagedSync(context.Background(), db, p2p.Config{}, &cfg, sentryControlServer, &shards.Notifications{}, nil, allSn, agg, nil, engine) if err != nil { panic(err) } @@ -1303,27 +1306,27 @@ func overrideStorageMode(db kv.RwDB) error { }) } -func initConsensusEngine(chainConfig *params.ChainConfig, datadir string, db kv.RwDB) (engine consensus.Engine) { - logger := log.New() +func initConsensusEngine(cc *params.ChainConfig, datadir string, db kv.RwDB) (engine consensus.Engine) { + l := log.New() snapshots, _ := allSnapshots(context.Background(), db) config := ethconfig.Defaults switch { - case chainConfig.Clique != nil: + case cc.Clique != nil: c := params.CliqueSnapshot c.DBPath = filepath.Join(datadir, "clique", "db") - engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, c, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, snapshots, db.ReadOnly(), db) - case chainConfig.Aura != nil: + engine = ethconsensusconfig.CreateConsensusEngine(cc, l, c, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, snapshots, db.ReadOnly(), db) + case cc.Aura != nil: consensusConfig := ¶ms.AuRaConfig{DBPath: filepath.Join(datadir, "aura")} - engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, snapshots, db.ReadOnly(), db) - case chainConfig.Parlia != nil: + engine = ethconsensusconfig.CreateConsensusEngine(cc, l, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, snapshots, db.ReadOnly(), db) + case cc.Parlia != nil: // Apply special hacks for BSC params params.ApplyBinanceSmartChainParams() consensusConfig := ¶ms.ParliaConfig{DBPath: filepath.Join(datadir, "parlia")} - engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, snapshots, db.ReadOnly(), db) - case chainConfig.Bor != nil: + engine = ethconsensusconfig.CreateConsensusEngine(cc, l, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, snapshots, db.ReadOnly(), db) + case cc.Bor != nil: consensusConfig := &config.Bor - engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, HeimdallURL, false, datadir, snapshots, db.ReadOnly(), db) + engine = ethconsensusconfig.CreateConsensusEngine(cc, l, consensusConfig, config.Miner.Notify, config.Miner.Noverify, HeimdallURL, false, datadir, snapshots, db.ReadOnly(), db) default: //ethash engine = ethash.NewFaker() } diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 6c5f21cc9de..b81825dc2a5 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -282,9 +282,9 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger, return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to execution service privateApi: %w", err) } - kvClient := remote.NewKVClient(conn) remoteBackendClient := remote.NewETHBACKENDClient(conn) - remoteKv, err := remotedb.NewRemote(gointerfaces.VersionFromProto(remotedbserver.KvServiceAPIVersion), logger, kvClient).Open() + remoteKvClient := remote.NewKVClient(conn) + remoteKv, err := remotedb.NewRemote(gointerfaces.VersionFromProto(remotedbserver.KvServiceAPIVersion), logger, remoteKvClient).Open() if err != nil { return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to remoteKv: %w", err) } @@ -339,7 +339,7 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger, if cfg.Snap.Enabled { allSnapshots = snapshotsync.NewRoSnapshots(cfg.Snap, cfg.Dirs.Snap) // To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down - // Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `kvClient.Snapshots` after establish grpc connection + // Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection allSnapshots.OptimisticReopenWithDB(db) allSnapshots.LogStat() @@ -357,7 +357,7 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger, }) onNewSnapshot = func() { go func() { // don't block events processing by network communication - reply, err := kvClient.Snapshots(ctx, &remote.SnapshotsRequest{}, grpc.WaitForReady(true)) + reply, err := remoteKvClient.Snapshots(ctx, &remote.SnapshotsRequest{}, grpc.WaitForReady(true)) if err != nil { log.Warn("[Snapshots] reopen", "err", err) return @@ -431,7 +431,7 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger, log.Info("if you run RPCDaemon on same machine with Erigon add --datadir option") } - subscribeToStateChangesLoop(ctx, kvClient, stateCache) + subscribeToStateChangesLoop(ctx, remoteKvClient, stateCache) txpoolConn := conn if cfg.TxPoolApiAddr != cfg.PrivateApiAddr { diff --git a/cmd/state/commands/check_change_sets.go b/cmd/state/commands/check_change_sets.go index 1f451644602..2264e9c1049 100644 --- a/cmd/state/commands/check_change_sets.go +++ b/cmd/state/commands/check_change_sets.go @@ -122,7 +122,7 @@ func CheckChangeSets(genesis *core.Genesis, logger log.Logger, blockNum uint64, commitEvery := time.NewTicker(30 * time.Second) defer commitEvery.Stop() - engine := initConsensusEngine(chainConfig, logger, allSnapshots) + engine := initConsensusEngine(chainConfig, allSnapshots) for !interrupt { diff --git a/cmd/state/commands/erigon2.go b/cmd/state/commands/erigon2.go index 70cf93a0f22..93d98fd3502 100644 --- a/cmd/state/commands/erigon2.go +++ b/cmd/state/commands/erigon2.go @@ -238,7 +238,7 @@ func Erigon2(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log. } else { blockReader = snapshotsync.NewBlockReader() } - engine := initConsensusEngine(chainConfig, logger, allSnapshots) + engine := initConsensusEngine(chainConfig, allSnapshots) for !interrupt { blockNum++ @@ -624,25 +624,26 @@ func (ww *WriterWrapper) CreateContract(address common.Address) error { return nil } -func initConsensusEngine(chainConfig *params.ChainConfig, logger log.Logger, snapshots *snapshotsync.RoSnapshots) (engine consensus.Engine) { +func initConsensusEngine(chainConfig *params.ChainConfig, snapshots *snapshotsync.RoSnapshots) (engine consensus.Engine) { + l := log.New() config := ethconfig.Defaults switch { case chainConfig.Clique != nil: c := params.CliqueSnapshot c.DBPath = filepath.Join(datadirCli, "clique", "db") - engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, c, config.Miner.Notify, config.Miner.Noverify, "", true, datadirCli, snapshots, true /* readonly */) + engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, l, c, config.Miner.Notify, config.Miner.Noverify, "", true, datadirCli, snapshots, true /* readonly */) case chainConfig.Aura != nil: consensusConfig := ¶ms.AuRaConfig{DBPath: filepath.Join(datadirCli, "aura")} - engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadirCli, snapshots, true /* readonly */) + engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, l, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadirCli, snapshots, true /* readonly */) case chainConfig.Parlia != nil: // Apply special hacks for BSC params params.ApplyBinanceSmartChainParams() consensusConfig := ¶ms.ParliaConfig{DBPath: filepath.Join(datadirCli, "parlia")} - engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadirCli, snapshots, true /* readonly */) + engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, l, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadirCli, snapshots, true /* readonly */) case chainConfig.Bor != nil: consensusConfig := &config.Bor - engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, config.HeimdallURL, false, datadirCli, snapshots, true /* readonly */) + engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, l, consensusConfig, config.Miner.Notify, config.Miner.Noverify, config.HeimdallURL, false, datadirCli, snapshots, true /* readonly */) default: //ethash engine = ethash.NewFaker() } diff --git a/cmd/state/commands/erigon4.go b/cmd/state/commands/erigon4.go index c18ab23ba6b..e580f1e00ae 100644 --- a/cmd/state/commands/erigon4.go +++ b/cmd/state/commands/erigon4.go @@ -197,7 +197,7 @@ func Erigon4(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log. return fmt.Errorf("reopen snapshot segments: %w", err) } blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) - engine := initConsensusEngine(chainConfig, logger, allSnapshots) + engine := initConsensusEngine(chainConfig, allSnapshots) getHeader := func(hash common.Hash, number uint64) *types.Header { h, err := blockReader.Header(ctx, historyTx, hash, number) diff --git a/cmd/state/exec3/state.go b/cmd/state/exec3/state.go index 24669213876..284830ae1ce 100644 --- a/cmd/state/exec3/state.go +++ b/cmd/state/exec3/state.go @@ -160,7 +160,7 @@ func (rw *Worker) RunTxTask(txTask *exec22.TxTask) { return core.SysCallContract(contract, data, *rw.chainConfig, ibs, header, rw.engine, false /* constCall */) } - if _, _, err := rw.engine.Finalize(rw.chainConfig, header, ibs, txTask.Txs, txTask.Uncles, nil /* receipts */, txTask.Withdrawals, rw.epoch, rw.chain, syscall); err != nil { + if _, _, err := rw.engine.Finalize(rw.chainConfig, types.CopyHeader(header), ibs, txTask.Txs, txTask.Uncles, nil /* receipts */, txTask.Withdrawals, rw.epoch, rw.chain, syscall); err != nil { //fmt.Printf("error=%v\n", err) txTask.Error = err } else { diff --git a/cmd/state/exec3/state_recon.go b/cmd/state/exec3/state_recon.go index b8dc9945840..99e8f1b47dd 100644 --- a/cmd/state/exec3/state_recon.go +++ b/cmd/state/exec3/state_recon.go @@ -18,6 +18,7 @@ import ( "github.com/ledgerwatch/erigon/core" "github.com/ledgerwatch/erigon/core/state" "github.com/ledgerwatch/erigon/core/systemcontracts" + "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/core/types/accounts" "github.com/ledgerwatch/erigon/core/vm" "github.com/ledgerwatch/erigon/core/vm/evmtypes" @@ -310,7 +311,7 @@ func (rw *ReconWorker) runTxTask(txTask *exec22.TxTask) { syscall := func(contract common.Address, data []byte) ([]byte, error) { return core.SysCallContract(contract, data, *rw.chainConfig, ibs, txTask.Header, rw.engine, false /* constCall */) } - if _, _, err := rw.engine.Finalize(rw.chainConfig, txTask.Header, ibs, txTask.Txs, txTask.Uncles, nil /* receipts */, txTask.Withdrawals, rw.epoch, rw.chain, syscall); err != nil { + if _, _, err := rw.engine.Finalize(rw.chainConfig, types.CopyHeader(txTask.Header), ibs, txTask.Txs, txTask.Uncles, nil /* receipts */, txTask.Withdrawals, rw.epoch, rw.chain, syscall); err != nil { if _, readError := rw.stateReader.ReadError(); !readError { panic(fmt.Errorf("finalize of block %d failed: %w", txTask.BlockNum, err)) } diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index 5f7b63d3a94..baf214878b2 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -21,6 +21,7 @@ import ( "bytes" "context" "errors" + "fmt" "io" "math/big" "math/rand" @@ -423,7 +424,7 @@ func (c *Clique) Seal(chain consensus.ChainHeaderReader, block *types.Block, res return err } if _, authorized := snap.Signers[signer]; !authorized { - return ErrUnauthorizedSigner + return fmt.Errorf("Clique.Seal: %w", ErrUnauthorizedSigner) } // If we're amongst the recent signers, wait for the next block for seen, recent := range snap.Recents { diff --git a/consensus/parlia/parlia.go b/consensus/parlia/parlia.go index 22a3ab4d420..d9ee2c41cf7 100644 --- a/consensus/parlia/parlia.go +++ b/consensus/parlia/parlia.go @@ -47,7 +47,7 @@ const ( inMemorySnapshots = 128 // Number of recent snapshots to keep in memory inMemorySignatures = 4096 // Number of recent block signatures to keep in memory - checkpointInterval = 1024 // Number of blocks after which to save the snapshot to the database + CheckpointInterval = 1024 // Number of blocks after which to save the snapshot to the database defaultEpochLength = uint64(100) // Default number of blocks of checkpoint to update validatorSet from contract extraVanity = 32 // Fixed number of extra-data prefix bytes reserved for signer vanity @@ -342,7 +342,7 @@ func (p *Parlia) verifyHeader(chain consensus.ChainHeaderReader, header *types.H // Don't waste time checking blocks from the future if header.Time > uint64(time.Now().Unix()) { - return consensus.ErrFutureBlock + return fmt.Errorf("header %d, time %d, now %d, %w", header.Number.Uint64(), header.Time, time.Now().Unix(), consensus.ErrFutureBlock) } // Check that the extra-data contains the vanity, validators and signature. if len(header.Extra) < extraVanity { @@ -475,7 +475,7 @@ func (p *Parlia) verifySeal(chain consensus.ChainHeaderReader, header *types.Hea } if _, ok := snap.Validators[signer]; !ok { - return errUnauthorizedValidator + return fmt.Errorf("parlia.verifySeal: headerNum=%d, validator=%x, %w", header.Number.Uint64(), signer.Bytes(), errUnauthorizedValidator) } for seen, recent := range snap.Recents { @@ -526,7 +526,7 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash } // If an on-disk checkpoint snapshot can be found, use that - if number%checkpointInterval == 0 { + if number%CheckpointInterval == 0 { if s, err := loadSnapshot(p.config, p.signatures, p.db, number, hash); err == nil { //log.Trace("Loaded snapshot from disk", "number", number, "hash", hash) snap = s @@ -535,21 +535,22 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash } } } - if (verify && number%p.config.Epoch == 0) || number == 0 { - if (p.snapshots != nil && number <= p.snapshots.BlocksAvailable()) || number == 0 { - // Headers included into the snapshots have to be trusted as checkpoints - checkpoint := chain.GetHeader(hash, number) - if checkpoint != nil { - validatorBytes := checkpoint.Extra[extraVanity : len(checkpoint.Extra)-extraSeal] - // get validators from headers - validators, err := ParseValidators(validatorBytes) - if err != nil { - return nil, err - } - // new snapshot - snap = newSnapshot(p.config, p.signatures, number, hash, validators) - break + if number == 0 { + // Headers included into the snapshots have to be trusted as checkpoints + checkpoint := chain.GetHeader(hash, number) + if checkpoint != nil { + validatorBytes := checkpoint.Extra[extraVanity : len(checkpoint.Extra)-extraSeal] + // get validators from headers + validators, err := ParseValidators(validatorBytes) + if err != nil { + return nil, err + } + // new snapshot + snap = newSnapshot(p.config, p.signatures, number, hash, validators) + if err := snap.store(p.db); err != nil { + return nil, err } + break } } @@ -592,7 +593,7 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash p.recentSnaps.Add(snap.Hash, snap) // If we've generated a new checkpoint snapshot, save to disk - if verify && snap.Number%checkpointInterval == 0 && len(headers) > 0 { + if verify && snap.Number%CheckpointInterval == 0 && len(headers) > 0 { if err = snap.store(p.db); err != nil { return nil, err } @@ -841,7 +842,7 @@ func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, res // Bail out if we're unauthorized to sign a block if _, authorized := snap.Validators[val]; !authorized { - return errUnauthorizedValidator + return fmt.Errorf("parlia.Seal: %w", errUnauthorizedValidator) } // If we're amongst the recent signers, wait for the next block diff --git a/consensus/parlia/ramanujanfork.go b/consensus/parlia/ramanujanfork.go index 1756d795896..fbc77b9e468 100644 --- a/consensus/parlia/ramanujanfork.go +++ b/consensus/parlia/ramanujanfork.go @@ -1,6 +1,7 @@ package parlia import ( + "fmt" "math/rand" "time" @@ -37,7 +38,7 @@ func (p *Parlia) blockTimeForRamanujanFork(snap *Snapshot, header, parent *types func (p *Parlia) blockTimeVerifyForRamanujanFork(snap *Snapshot, header, parent *types.Header) error { if p.chainConfig.IsRamanujan(header.Number.Uint64()) { if header.Time < parent.Time+p.config.Period+backOffTime(snap, header.Coinbase) { - return consensus.ErrFutureBlock + return fmt.Errorf("header %d, time %d, now %d, period: %d, backof: %d, %w", header.Number.Uint64(), header.Time, time.Now().Unix(), p.config.Period, backOffTime(snap, header.Coinbase), consensus.ErrFutureBlock) } } return nil diff --git a/consensus/parlia/snapshot.go b/consensus/parlia/snapshot.go index 1f956881615..1e91ba9a72b 100644 --- a/consensus/parlia/snapshot.go +++ b/consensus/parlia/snapshot.go @@ -19,22 +19,21 @@ package parlia import ( "bytes" "context" - "encoding/binary" "encoding/hex" "encoding/json" "errors" + "fmt" "math/big" "sort" - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/log/v3" - lru "github.com/hashicorp/golang-lru" - + common2 "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/params" + "github.com/ledgerwatch/log/v3" ) // Snapshot is the state of the validatorSet at a given point. @@ -81,20 +80,13 @@ func (s validatorsAscending) Len() int { return len(s) } func (s validatorsAscending) Less(i, j int) bool { return bytes.Compare(s[i][:], s[j][:]) < 0 } func (s validatorsAscending) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -const NumberLength = 8 - -// EncodeBlockNumber encodes a block number as big endian uint64 -func EncodeBlockNumber(number uint64) []byte { - enc := make([]byte, NumberLength) - binary.BigEndian.PutUint64(enc, number) - return enc -} - // SnapshotFullKey = SnapshotBucket + num (uint64 big endian) + hash func SnapshotFullKey(number uint64, hash common.Hash) []byte { - return append(EncodeBlockNumber(number), hash.Bytes()...) + return append(common2.EncodeTs(number), hash.Bytes()...) } +var ErrNoSnapsnot = fmt.Errorf("no parlia snapshot") + // loadSnapshot loads an existing snapshot from the database. func loadSnapshot(config *params.ParliaConfig, sigCache *lru.ARCCache, db kv.RwDB, num uint64, hash common.Hash) (*Snapshot, error) { tx, err := db.BeginRo(context.Background()) @@ -106,6 +98,10 @@ func loadSnapshot(config *params.ParliaConfig, sigCache *lru.ARCCache, db kv.RwD if err != nil { return nil, err } + + if len(blob) == 0 { + return nil, ErrNoSnapsnot + } snap := new(Snapshot) if err := json.Unmarshal(blob, snap); err != nil { return nil, err @@ -121,7 +117,7 @@ func (s *Snapshot) store(db kv.RwDB) error { if err != nil { return err } - return db.Update(context.Background(), func(tx kv.RwTx) error { + return db.UpdateAsync(context.Background(), func(tx kv.RwTx) error { return tx.Put(kv.ParliaSnapshot, SnapshotFullKey(s.Number, s.Hash), blob) }) } diff --git a/core/rawdb/rawdbreset/reset_stages.go b/core/rawdb/rawdbreset/reset_stages.go index 61f67f1c678..c56e9420108 100644 --- a/core/rawdb/rawdbreset/reset_stages.go +++ b/core/rawdb/rawdbreset/reset_stages.go @@ -7,12 +7,15 @@ import ( "time" common2 "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/common/datadir" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/kvcfg" + "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/core" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/eth/stagedsync" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" + "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/erigon/turbo/snapshotsync" "github.com/ledgerwatch/log/v3" @@ -50,7 +53,7 @@ func ResetState(db kv.RwDB, ctx context.Context, chain string) error { return nil } -func ResetBlocks(tx kv.RwTx, db kv.RoDB, snapshots *snapshotsync.RoSnapshots, br services.HeaderAndCanonicalReader, tmpdir string) error { +func ResetBlocks(tx kv.RwTx, db kv.RoDB, snapshots *snapshotsync.RoSnapshots, br services.FullBlockReader, dirs datadir.Dirs, cc params.ChainConfig, engine consensus.Engine) error { // keep Genesis if err := rawdb.TruncateBlocks(context.Background(), tx, 1); err != nil { return err @@ -100,7 +103,7 @@ func ResetBlocks(tx kv.RwTx, db kv.RoDB, snapshots *snapshotsync.RoSnapshots, br } if snapshots != nil && snapshots.Cfg().Enabled && snapshots.BlocksAvailable() > 0 { - if err := stagedsync.FillDBFromSnapshots("fillind_db_from_snapshots", context.Background(), tx, tmpdir, snapshots, br); err != nil { + if err := stagedsync.FillDBFromSnapshots("fillind_db_from_snapshots", context.Background(), tx, dirs, snapshots, br, cc, engine); err != nil { return err } _ = stages.SaveStageProgress(tx, stages.Snapshots, snapshots.BlocksAvailable()) diff --git a/eth/backend.go b/eth/backend.go index 9bb728beb64..c5d3990c3a8 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -640,7 +640,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere return nil, err } - backend.stagedSync, err = stages2.NewStagedSync(backend.sentryCtx, backend.chainDB, stack.Config().P2P, config, backend.sentriesClient, backend.notifications, backend.downloaderClient, allSnapshots, backend.agg, backend.forkValidator) + backend.stagedSync, err = stages2.NewStagedSync(backend.sentryCtx, backend.chainDB, stack.Config().P2P, config, backend.sentriesClient, backend.notifications, backend.downloaderClient, allSnapshots, backend.agg, backend.forkValidator, backend.engine) if err != nil { return nil, err } diff --git a/eth/ethconsensusconfig/config.go b/eth/ethconsensusconfig/config.go index 2d52c260050..45bc6918b74 100644 --- a/eth/ethconsensusconfig/config.go +++ b/eth/ethconsensusconfig/config.go @@ -1,9 +1,10 @@ package ethconsensusconfig import ( - "github.com/ledgerwatch/erigon-lib/kv" "path/filepath" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/davecgh/go-spew/spew" "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/consensus/aura" diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index f3466db2251..c5154e77f3b 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -108,9 +108,6 @@ func ExecV3(ctx context.Context, logger log.Logger, maxBlockNum uint64, ) (err error) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - batchSize, chainDb := cfg.batchSize, cfg.db blockReader := cfg.blockReader agg, engine := cfg.agg, cfg.engine @@ -469,6 +466,9 @@ func ExecV3(ctx context.Context, applyWorker.ResetTx(applyTx) } + _, isPoSa := cfg.engine.(consensus.PoSA) + //isBor := cfg.chainConfig.Bor != nil + var b *types.Block var blockNum uint64 Loop: @@ -576,11 +576,11 @@ Loop: count++ applyWorker.RunTxTask(txTask) if err := func() error { - if txTask.Final { + if txTask.Final && !isPoSa { gasUsed += txTask.UsedGas if gasUsed != txTask.Header.GasUsed { if txTask.BlockNum > 0 { //Disable check for genesis. Maybe need somehow improve it in future - to satisfy TestExecutionSpec - return fmt.Errorf("gas used by execution: %d, in header: %d", gasUsed, txTask.Header.GasUsed) + return fmt.Errorf("gas used by execution: %d, in header: %d, headerNum=%d, %x", gasUsed, txTask.Header.GasUsed, txTask.Header.Number.Uint64(), txTask.Header.Hash()) } } gasUsed = 0 diff --git a/eth/stagedsync/stage_snapshots.go b/eth/stagedsync/stage_snapshots.go index cda2f46366f..95e227874f2 100644 --- a/eth/stagedsync/stage_snapshots.go +++ b/eth/stagedsync/stage_snapshots.go @@ -20,6 +20,8 @@ import ( "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/kvcfg" "github.com/ledgerwatch/erigon-lib/state" + "github.com/ledgerwatch/erigon/consensus" + "github.com/ledgerwatch/erigon/consensus/parlia" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/ethconfig/estimate" @@ -42,8 +44,10 @@ type SnapshotsCfg struct { snapshotDownloader proto_downloader.DownloaderClient blockReader services.FullBlockReader dbEventNotifier snapshotsync.DBEventNotifier - historyV3 bool - agg *state.Aggregator22 + engine consensus.Engine + + historyV3 bool + agg *state.Aggregator22 } func StageSnapshotsCfg( @@ -55,6 +59,7 @@ func StageSnapshotsCfg( snapshotDownloader proto_downloader.DownloaderClient, blockReader services.FullBlockReader, dbEventNotifier snapshotsync.DBEventNotifier, + engine consensus.Engine, historyV3 bool, agg *state.Aggregator22, ) SnapshotsCfg { @@ -69,6 +74,7 @@ func StageSnapshotsCfg( dbEventNotifier: dbEventNotifier, historyV3: historyV3, agg: agg, + engine: engine, } } @@ -174,13 +180,14 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R } s.BlockNumber = blocksAvailable } - if err := FillDBFromSnapshots(s.LogPrefix(), ctx, tx, cfg.dirs.Tmp, cfg.snapshots, cfg.blockReader); err != nil { + + if err := FillDBFromSnapshots(s.LogPrefix(), ctx, tx, cfg.dirs, cfg.snapshots, cfg.blockReader, cfg.chainConfig, cfg.engine); err != nil { return err } return nil } -func FillDBFromSnapshots(logPrefix string, ctx context.Context, tx kv.RwTx, tmpdir string, sn *snapshotsync.RoSnapshots, blockReader services.HeaderAndCanonicalReader) error { +func FillDBFromSnapshots(logPrefix string, ctx context.Context, tx kv.RwTx, dirs datadir.Dirs, sn *snapshotsync.RoSnapshots, blockReader services.FullBlockReader, chainConfig params.ChainConfig, engine consensus.Engine) error { blocksAvailable := sn.BlocksAvailable() logEvery := time.NewTicker(logInterval) defer logEvery.Stop() @@ -199,13 +206,15 @@ func FillDBFromSnapshots(logPrefix string, ctx context.Context, tx kv.RwTx, tmpd } switch stage { case stages.Headers: - h2n := etl.NewCollector("Snapshots", tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize)) + h2n := etl.NewCollector(logPrefix, dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize)) defer h2n.Close() h2n.LogLvl(log.LvlDebug) // fill some small tables from snapshots, in future we may store this data in snapshots also, but // for now easier just store them in db td := big.NewInt(0) + blockNumBytes := make([]byte, 8) + chainReader := &ChainReaderImpl{config: &chainConfig, tx: tx, blockReader: blockReader} if err := snapshotsync.ForEachHeader(ctx, sn, func(header *types.Header) error { blockNum, blockHash := header.Number.Uint64(), header.Hash() td.Add(td, header.Difficulty) @@ -216,9 +225,25 @@ func FillDBFromSnapshots(logPrefix string, ctx context.Context, tx kv.RwTx, tmpd if err := rawdb.WriteCanonicalHash(tx, blockHash, blockNum); err != nil { return err } - if err := h2n.Collect(blockHash[:], libcommon.EncodeTs(blockNum)); err != nil { + binary.BigEndian.PutUint64(blockNumBytes, blockNum) + if err := h2n.Collect(blockHash[:], blockNumBytes); err != nil { return err } + + if engine != nil { + // consensus may have own database, let's fill it + // different consensuses may have some conditions for validators snapshots + need := false + switch engine.(type) { + case *parlia.Parlia: + need = (blockNum-1)%(100*parlia.CheckpointInterval) == 0 + } + if need { + if err := engine.VerifyHeader(chainReader, header, true /* seal */); err != nil { + return err + } + } + } select { case <-ctx.Done(): return ctx.Err() @@ -240,6 +265,7 @@ func FillDBFromSnapshots(logPrefix string, ctx context.Context, tx kv.RwTx, tmpd if err = rawdb.WriteHeadHeaderHash(tx, canonicalHash); err != nil { return err } + case stages.Bodies: // ResetSequence - allow set arbitrary value to sequence (for example to decrement it to exact value) ok, err := sn.ViewTxs(blocksAvailable, func(sn *snapshotsync.TxnSegment) error { diff --git a/migrations/reset_blocks.go b/migrations/reset_blocks.go index b56d4f1f96c..39eecccbeb4 100644 --- a/migrations/reset_blocks.go +++ b/migrations/reset_blocks.go @@ -6,6 +6,7 @@ import ( "github.com/ledgerwatch/erigon-lib/common/datadir" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/cmd/hack/tool" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/rawdb/rawdbreset" @@ -110,7 +111,8 @@ var resetBlocks4 = Migration{ log.Warn("NOTE: this migration will remove recent blocks (and senders) to fix several recent bugs. Your node will re-download last ~400K blocks, should not take very long") } - if err := rawdbreset.ResetBlocks(tx, db, nil, nil, dirs.Tmp); err != nil { + cc := tool.ChainConfig(tx) + if err := rawdbreset.ResetBlocks(tx, db, nil, nil, dirs, *cc, nil); err != nil { return err } diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index 6a94ecef905..b4b94bd3144 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -346,7 +346,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey blockRetire := snapshotsync.NewBlockRetire(1, dirs.Tmp, mock.BlockSnapshots, mock.DB, snapshotsDownloader, mock.Notifications.Events) mock.Sync = stagedsync.New( stagedsync.DefaultStages(mock.Ctx, prune, - stagedsync.StageSnapshotsCfg(mock.DB, *mock.ChainConfig, dirs, mock.BlockSnapshots, blockRetire, snapshotsDownloader, blockReader, mock.Notifications.Events, mock.HistoryV3, mock.agg), + stagedsync.StageSnapshotsCfg(mock.DB, *mock.ChainConfig, dirs, mock.BlockSnapshots, blockRetire, snapshotsDownloader, blockReader, mock.Notifications.Events, mock.Engine, mock.HistoryV3, mock.agg), stagedsync.StageHeadersCfg( mock.DB, mock.sentriesClient.Hd, diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index adb284b8fa0..c99805a996a 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -16,6 +16,7 @@ import ( "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/memdb" "github.com/ledgerwatch/erigon-lib/state" + "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/erigon/cmd/sentry/sentry" @@ -389,6 +390,7 @@ func NewStagedSync(ctx context.Context, snapshots *snapshotsync.RoSnapshots, agg *state.Aggregator22, forkValidator *engineapi.ForkValidator, + engine consensus.Engine, ) (*stagedsync.Sync, error) { dirs := cfg.Dirs var blockReader services.FullBlockReader @@ -405,7 +407,7 @@ func NewStagedSync(ctx context.Context, return stagedsync.New( stagedsync.DefaultStages(ctx, cfg.Prune, - stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, dirs, snapshots, blockRetire, snapDownloader, blockReader, notifications.Events, cfg.HistoryV3, agg), + stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, dirs, snapshots, blockRetire, snapDownloader, blockReader, notifications.Events, engine, cfg.HistoryV3, agg), stagedsync.StageHeadersCfg( db, controlServer.Hd,