Skip to content

Commit

Permalink
add logs in recoverFromDb func (#2769)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Oct 4, 2021
1 parent b890e97 commit de16cb9
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 9 deletions.
72 changes: 72 additions & 0 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ledgerwatch/erigon/cmd/sentry/download"
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/consensus/ethash"
"github.com/ledgerwatch/erigon/core"
Expand All @@ -38,6 +39,23 @@ import (
"github.com/spf13/cobra"
)

var cmdStageHeaders = &cobra.Command{
Use: "stage_headers",
Short: "",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, _ := utils.RootContext()
logger := log.New()
db := openDB(chaindata, logger, true)
defer db.Close()

if err := stageHeaders(db, ctx); err != nil {
log.Error("Error", "err", err)
return err
}
return nil
},
}

var cmdStageBodies = &cobra.Command{
Use: "stage_bodies",
Short: "",
Expand Down Expand Up @@ -276,6 +294,12 @@ func init() {

rootCmd.AddCommand(cmdStageSenders)

withDatadir(cmdStageHeaders)
withUnwind(cmdStageHeaders)
withChain(cmdStageHeaders)

rootCmd.AddCommand(cmdStageHeaders)

withDatadir(cmdStageBodies)
withUnwind(cmdStageBodies)
withChain(cmdStageBodies)
Expand Down Expand Up @@ -376,6 +400,54 @@ func init() {
rootCmd.AddCommand(cmdSetPrune)
}

func stageHeaders(db kv.RwDB, ctx context.Context) error {
return db.Update(ctx, func(tx kv.RwTx) error {
if unwind > 0 {
progress, err := stages.GetStageProgress(tx, stages.Headers)
if err != nil {
return fmt.Errorf("read Bodies progress: %w", err)
}
if unwind > progress {
return fmt.Errorf("cannot unwind past 0")
}
if err = stages.SaveStageProgress(tx, stages.Headers, progress-unwind); err != nil {
return fmt.Errorf("saving Bodies progress failed: %w", err)
}
progress, err = stages.GetStageProgress(tx, stages.Headers)
if err != nil {
return fmt.Errorf("re-read Bodies progress: %w", err)
}
// remove all canonical markers from this point
if err := tx.ForEach(kv.HeaderCanonical, dbutils.EncodeBlockNumber(progress+1), func(k, v []byte) error {
return tx.Delete(kv.HeaderCanonical, k, nil)
}); err != nil {
return err
}
if err := tx.ForEach(kv.Headers, dbutils.EncodeBlockNumber(progress+1), func(k, v []byte) error {
return tx.Delete(kv.Headers, k, nil)
}); err != nil {
return err
}
if err := tx.ForEach(kv.HeaderTD, dbutils.EncodeBlockNumber(progress+1), func(k, v []byte) error {
return tx.Delete(kv.HeaderTD, k, nil)
}); err != nil {
return err
}
hash, err := rawdb.ReadCanonicalHash(tx, progress-1)
if err != nil {
return err
}
if err = tx.Put(kv.HeadHeaderKey, []byte(kv.HeadHeaderKey), hash[:]); err != nil {
log.Error("ReadHeadHeaderHash failed", "err", err)
}
log.Info("Progress", "headers", progress)
return nil
}
log.Info("This command only works with --unwind option")
return nil
})
}

func stageBodies(db kv.RwDB, ctx context.Context) error {
return db.Update(ctx, func(tx kv.RwTx) error {
if unwind > 0 {
Expand Down
44 changes: 39 additions & 5 deletions cmd/rpcdaemon/test.http
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,46 @@
POST localhost:8545
Content-Type: application/json

{ "jsonrpc": "2.0", "method": "eth_syncing", "params": [], "id":1}
{
"jsonrpc": "2.0",
"method": "eth_syncing",
"params": [],
"id": 1
}

###

POST localhost:8545
Content-Type: application/json

{"jsonrpc":"2.0","method":"eth_getBalance","params": ["0xfffa4763f94f7ad191b366a343092a5d1a47ed08", "0xde84"],"id":1}
{
"jsonrpc": "2.0",
"method": "eth_getBalance",
"params": [
"0xfffa4763f94f7ad191b366a343092a5d1a47ed08",
"0xde84"
],
"id": 1
}

###

POST localhost:8545
Content-Type: application/json

{ "jsonrpc": "2.0", "method": "debug_accountRange", "params": ["0x1e8480", "", 256, false, false, false], "id":1}
{
"jsonrpc": "2.0",
"method": "debug_accountRange",
"params": [
"0x1e8480",
"",
256,
false,
false,
false
],
"id": 1
}

###

Expand All @@ -30,7 +55,7 @@ Content-Type: application/json
"jsonrpc": "2.0",
"method": "eth_getBlockByNumber",
"params": [
"0x1b4",
"0x0",
true
],
"id": 1
Expand All @@ -54,7 +79,15 @@ Content-Type: application/json
POST localhost:8545
Content-Type: application/json

{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["0xf4240",true],"id":2}
{
"jsonrpc": "2.0",
"method": "eth_getBlockByNumber",
"params": [
"0xf4240",
true
],
"id": 2
}

###

Expand All @@ -73,6 +106,7 @@ Content-Type: application/json
],
"id": 537758
}

### > 60

### >20
Expand Down
2 changes: 1 addition & 1 deletion eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ Loop:
if stopped {
return libcommon.ErrStopped
}
// We do not print the followin line if the stage was interrupted
// We do not print the following line if the stage was interrupted
log.Info(fmt.Sprintf("[%s] Processed", logPrefix), "highest inserted", headerInserter.GetHighest(), "age", common.PrettyAge(time.Unix(int64(headerInserter.GetHighestTimestamp()), 0)))
return nil
}
Expand Down
10 changes: 10 additions & 0 deletions turbo/stages/headerdownload/header_algos.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,10 @@ func (hd *HeaderDownload) SetPreverifiedHashes(preverifiedHashes map[common.Hash
func (hd *HeaderDownload) RecoverFromDb(db kv.RoDB) error {
hd.lock.Lock()
defer hd.lock.Unlock()

logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()

// Drain persistedLinksQueue and remove links
for hd.persistedLinkQueue.Len() > 0 {
link := heap.Pop(hd.persistedLinkQueue).(*Link)
Expand All @@ -473,6 +477,12 @@ func (hd *HeaderDownload) RecoverFromDb(db kv.RoDB) error {
return err
}
hd.addHeaderAsLink(&h, true /* persisted */)

select {
case <-logEvery.C:
log.Info("recover headers from db", "left", hd.persistedLinkLimit-hd.persistedLinkQueue.Len())
default:
}
}
hd.highestInDb, err = stages.GetStageProgress(tx, stages.Headers)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions turbo/stages/stageloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func StageLoopStep(
var headTd *big.Int
var head uint64
var headHash common.Hash
if head, err = stages.GetStageProgress(rotx, stages.Finish); err != nil {
if head, err = stages.GetStageProgress(rotx, stages.Headers); err != nil {
return err
}
if headHash, err = rawdb.ReadCanonicalHash(rotx, head); err != nil {
Expand All @@ -165,8 +165,8 @@ func StageLoopStep(
}
}
rotx.Rollback()
headTd256 := new(uint256.Int)
overflow := headTd256.SetFromBig(headTd)

headTd256, overflow := uint256.FromBig(headTd)
if overflow {
return fmt.Errorf("headTds higher than 2^256-1")
}
Expand Down

0 comments on commit de16cb9

Please sign in to comment.