diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 650acd64cf1..813726ea4cf 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -15,6 +15,7 @@ import ( "github.com/ledgerwatch/erigon/cmd/sentry/download" "github.com/ledgerwatch/erigon/cmd/utils" "github.com/ledgerwatch/erigon/common" + "github.com/ledgerwatch/erigon/common/dbutils" "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/consensus/ethash" "github.com/ledgerwatch/erigon/core" @@ -38,6 +39,23 @@ import ( "github.com/spf13/cobra" ) +var cmdStageHeaders = &cobra.Command{ + Use: "stage_headers", + Short: "", + RunE: func(cmd *cobra.Command, args []string) error { + ctx, _ := utils.RootContext() + logger := log.New() + db := openDB(chaindata, logger, true) + defer db.Close() + + if err := stageHeaders(db, ctx); err != nil { + log.Error("Error", "err", err) + return err + } + return nil + }, +} + var cmdStageBodies = &cobra.Command{ Use: "stage_bodies", Short: "", @@ -276,6 +294,12 @@ func init() { rootCmd.AddCommand(cmdStageSenders) + withDatadir(cmdStageHeaders) + withUnwind(cmdStageHeaders) + withChain(cmdStageHeaders) + + rootCmd.AddCommand(cmdStageHeaders) + withDatadir(cmdStageBodies) withUnwind(cmdStageBodies) withChain(cmdStageBodies) @@ -376,6 +400,54 @@ func init() { rootCmd.AddCommand(cmdSetPrune) } +func stageHeaders(db kv.RwDB, ctx context.Context) error { + return db.Update(ctx, func(tx kv.RwTx) error { + if unwind > 0 { + progress, err := stages.GetStageProgress(tx, stages.Headers) + if err != nil { + return fmt.Errorf("read Bodies progress: %w", err) + } + if unwind > progress { + return fmt.Errorf("cannot unwind past 0") + } + if err = stages.SaveStageProgress(tx, stages.Headers, progress-unwind); err != nil { + return fmt.Errorf("saving Bodies progress failed: %w", err) + } + progress, err = stages.GetStageProgress(tx, stages.Headers) + if err != nil { + return fmt.Errorf("re-read Bodies progress: %w", err) + } + // remove all canonical markers from this point + if err := tx.ForEach(kv.HeaderCanonical, dbutils.EncodeBlockNumber(progress+1), func(k, v []byte) error { + return tx.Delete(kv.HeaderCanonical, k, nil) + }); err != nil { + return err + } + if err := tx.ForEach(kv.Headers, dbutils.EncodeBlockNumber(progress+1), func(k, v []byte) error { + return tx.Delete(kv.Headers, k, nil) + }); err != nil { + return err + } + if err := tx.ForEach(kv.HeaderTD, dbutils.EncodeBlockNumber(progress+1), func(k, v []byte) error { + return tx.Delete(kv.HeaderTD, k, nil) + }); err != nil { + return err + } + hash, err := rawdb.ReadCanonicalHash(tx, progress-1) + if err != nil { + return err + } + if err = tx.Put(kv.HeadHeaderKey, []byte(kv.HeadHeaderKey), hash[:]); err != nil { + log.Error("ReadHeadHeaderHash failed", "err", err) + } + log.Info("Progress", "headers", progress) + return nil + } + log.Info("This command only works with --unwind option") + return nil + }) +} + func stageBodies(db kv.RwDB, ctx context.Context) error { return db.Update(ctx, func(tx kv.RwTx) error { if unwind > 0 { diff --git a/cmd/rpcdaemon/test.http b/cmd/rpcdaemon/test.http index 41e780cb183..be0cbe9cfe4 100644 --- a/cmd/rpcdaemon/test.http +++ b/cmd/rpcdaemon/test.http @@ -4,21 +4,46 @@ POST localhost:8545 Content-Type: application/json -{ "jsonrpc": "2.0", "method": "eth_syncing", "params": [], "id":1} +{ + "jsonrpc": "2.0", + "method": "eth_syncing", + "params": [], + "id": 1 +} ### POST localhost:8545 Content-Type: application/json -{"jsonrpc":"2.0","method":"eth_getBalance","params": ["0xfffa4763f94f7ad191b366a343092a5d1a47ed08", "0xde84"],"id":1} +{ + "jsonrpc": "2.0", + "method": "eth_getBalance", + "params": [ + "0xfffa4763f94f7ad191b366a343092a5d1a47ed08", + "0xde84" + ], + "id": 1 +} ### POST localhost:8545 Content-Type: application/json -{ "jsonrpc": "2.0", "method": "debug_accountRange", "params": ["0x1e8480", "", 256, false, false, false], "id":1} +{ + "jsonrpc": "2.0", + "method": "debug_accountRange", + "params": [ + "0x1e8480", + "", + 256, + false, + false, + false + ], + "id": 1 +} ### @@ -30,7 +55,7 @@ Content-Type: application/json "jsonrpc": "2.0", "method": "eth_getBlockByNumber", "params": [ - "0x1b4", + "0x0", true ], "id": 1 @@ -54,7 +79,15 @@ Content-Type: application/json POST localhost:8545 Content-Type: application/json -{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["0xf4240",true],"id":2} +{ + "jsonrpc": "2.0", + "method": "eth_getBlockByNumber", + "params": [ + "0xf4240", + true + ], + "id": 2 +} ### @@ -73,6 +106,7 @@ Content-Type: application/json ], "id": 537758 } + ### > 60 ### >20 diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index fb6069dadfb..584ee79bfa9 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -216,7 +216,7 @@ Loop: if stopped { return libcommon.ErrStopped } - // We do not print the followin line if the stage was interrupted + // We do not print the following line if the stage was interrupted log.Info(fmt.Sprintf("[%s] Processed", logPrefix), "highest inserted", headerInserter.GetHighest(), "age", common.PrettyAge(time.Unix(int64(headerInserter.GetHighestTimestamp()), 0))) return nil } diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index 5b19b500e5e..769e6ba4177 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -453,6 +453,10 @@ func (hd *HeaderDownload) SetPreverifiedHashes(preverifiedHashes map[common.Hash func (hd *HeaderDownload) RecoverFromDb(db kv.RoDB) error { hd.lock.Lock() defer hd.lock.Unlock() + + logEvery := time.NewTicker(30 * time.Second) + defer logEvery.Stop() + // Drain persistedLinksQueue and remove links for hd.persistedLinkQueue.Len() > 0 { link := heap.Pop(hd.persistedLinkQueue).(*Link) @@ -473,6 +477,12 @@ func (hd *HeaderDownload) RecoverFromDb(db kv.RoDB) error { return err } hd.addHeaderAsLink(&h, true /* persisted */) + + select { + case <-logEvery.C: + log.Info("recover headers from db", "left", hd.persistedLinkLimit-hd.persistedLinkQueue.Len()) + default: + } } hd.highestInDb, err = stages.GetStageProgress(tx, stages.Headers) if err != nil { diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 9e7f71a2300..99d892830e7 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -148,7 +148,7 @@ func StageLoopStep( var headTd *big.Int var head uint64 var headHash common.Hash - if head, err = stages.GetStageProgress(rotx, stages.Finish); err != nil { + if head, err = stages.GetStageProgress(rotx, stages.Headers); err != nil { return err } if headHash, err = rawdb.ReadCanonicalHash(rotx, head); err != nil { @@ -165,8 +165,8 @@ func StageLoopStep( } } rotx.Rollback() - headTd256 := new(uint256.Int) - overflow := headTd256.SetFromBig(headTd) + + headTd256, overflow := uint256.FromBig(headTd) if overflow { return fmt.Errorf("headTds higher than 2^256-1") }