From 90d25514af32f4e489d71fddf43c7f66c7de517b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 21 Feb 2023 12:17:34 +0200 Subject: [PATCH] core, eth: merge snap-sync chain download progress logs (#26676) --- core/blockchain.go | 2 +- core/headerchain.go | 2 +- core/rawdb/accessors_chain.go | 26 +++++++------- core/rawdb/ancient_scheme.go | 30 ++++++++-------- core/rawdb/chain_freezer.go | 10 +++--- core/rawdb/chain_iterator.go | 2 +- core/rawdb/database.go | 2 +- eth/downloader/downloader.go | 65 ++++++++++++++++++++++++++++++++++- eth/downloader/queue.go | 9 ++--- 9 files changed, 106 insertions(+), 42 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 98d2e7a774ad..38a129d4eec5 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1278,7 +1278,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ if stats.ignored > 0 { context = append(context, []interface{}{"ignored", stats.ignored}...) } - log.Info("Imported new block receipts", context...) + log.Debug("Imported new block receipts", context...) return 0, nil } diff --git a/core/headerchain.go b/core/headerchain.go index d40d26f72bf7..aed3c720c633 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -389,7 +389,7 @@ func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, start time.Time, if res.ignored > 0 { context = append(context, []interface{}{"ignored", res.ignored}...) } - log.Info("Imported new block headers", context...) + log.Debug("Imported new block headers", context...) return res.status, err } diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 9b90d8f20cc4..e4dac3407fc5 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -37,7 +37,7 @@ import ( func ReadCanonicalHash(db ethdb.Reader, number uint64) common.Hash { var data []byte db.ReadAncients(func(reader ethdb.AncientReaderOp) error { - data, _ = reader.Ancient(chainFreezerHashTable, number) + data, _ = reader.Ancient(ChainFreezerHashTable, number) if len(data) == 0 { // Get it by hash from leveldb data, _ = db.Get(headerHashKey(number)) @@ -334,7 +334,7 @@ func ReadHeaderRange(db ethdb.Reader, number uint64, count uint64) []rlp.RawValu } // read remaining from ancients max := count * 700 - data, err := db.AncientRange(chainFreezerHeaderTable, i+1-count, count, max) + data, err := db.AncientRange(ChainFreezerHeaderTable, i+1-count, count, max) if err == nil && uint64(len(data)) == count { // the data is on the order [h, h+1, .., n] -- reordering needed for i := range data { @@ -351,7 +351,7 @@ func ReadHeaderRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValu // First try to look up the data in ancient database. Extra hash // comparison is necessary since ancient database only maintains // the canonical data. - data, _ = reader.Ancient(chainFreezerHeaderTable, number) + data, _ = reader.Ancient(ChainFreezerHeaderTable, number) if len(data) > 0 && crypto.Keccak256Hash(data) == hash { return nil } @@ -427,7 +427,7 @@ func deleteHeaderWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number // isCanon is an internal utility method, to check whether the given number/hash // is part of the ancient (canon) set. func isCanon(reader ethdb.AncientReaderOp, number uint64, hash common.Hash) bool { - h, err := reader.Ancient(chainFreezerHashTable, number) + h, err := reader.Ancient(ChainFreezerHashTable, number) if err != nil { return false } @@ -443,7 +443,7 @@ func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue db.ReadAncients(func(reader ethdb.AncientReaderOp) error { // Check if the data is in ancients if isCanon(reader, number, hash) { - data, _ = reader.Ancient(chainFreezerBodiesTable, number) + data, _ = reader.Ancient(ChainFreezerBodiesTable, number) return nil } // If not, try reading from leveldb @@ -458,7 +458,7 @@ func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue func ReadCanonicalBodyRLP(db ethdb.Reader, number uint64) rlp.RawValue { var data []byte db.ReadAncients(func(reader ethdb.AncientReaderOp) error { - data, _ = reader.Ancient(chainFreezerBodiesTable, number) + data, _ = reader.Ancient(ChainFreezerBodiesTable, number) if len(data) > 0 { return nil } @@ -526,7 +526,7 @@ func ReadTdRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { db.ReadAncients(func(reader ethdb.AncientReaderOp) error { // Check if the data is in ancients if isCanon(reader, number, hash) { - data, _ = reader.Ancient(chainFreezerDifficultyTable, number) + data, _ = reader.Ancient(ChainFreezerDifficultyTable, number) return nil } // If not, try reading from leveldb @@ -586,7 +586,7 @@ func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawVa db.ReadAncients(func(reader ethdb.AncientReaderOp) error { // Check if the data is in ancients if isCanon(reader, number, hash) { - data, _ = reader.Ancient(chainFreezerReceiptTable, number) + data, _ = reader.Ancient(ChainFreezerReceiptTable, number) return nil } // If not, try reading from leveldb @@ -787,19 +787,19 @@ func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *types.Header, receipts []*types.ReceiptForStorage, td *big.Int) error { num := block.NumberU64() - if err := op.AppendRaw(chainFreezerHashTable, num, block.Hash().Bytes()); err != nil { + if err := op.AppendRaw(ChainFreezerHashTable, num, block.Hash().Bytes()); err != nil { return fmt.Errorf("can't add block %d hash: %v", num, err) } - if err := op.Append(chainFreezerHeaderTable, num, header); err != nil { + if err := op.Append(ChainFreezerHeaderTable, num, header); err != nil { return fmt.Errorf("can't append block header %d: %v", num, err) } - if err := op.Append(chainFreezerBodiesTable, num, block.Body()); err != nil { + if err := op.Append(ChainFreezerBodiesTable, num, block.Body()); err != nil { return fmt.Errorf("can't append block body %d: %v", num, err) } - if err := op.Append(chainFreezerReceiptTable, num, receipts); err != nil { + if err := op.Append(ChainFreezerReceiptTable, num, receipts); err != nil { return fmt.Errorf("can't append block %d receipts: %v", num, err) } - if err := op.Append(chainFreezerDifficultyTable, num, td); err != nil { + if err := op.Append(ChainFreezerDifficultyTable, num, td); err != nil { return fmt.Errorf("can't append block %d total difficulty: %v", num, err) } return nil diff --git a/core/rawdb/ancient_scheme.go b/core/rawdb/ancient_scheme.go index 047b504a24bc..b0428c5f5bd9 100644 --- a/core/rawdb/ancient_scheme.go +++ b/core/rawdb/ancient_scheme.go @@ -18,30 +18,30 @@ package rawdb // The list of table names of chain freezer. const ( - // chainFreezerHeaderTable indicates the name of the freezer header table. - chainFreezerHeaderTable = "headers" + // ChainFreezerHeaderTable indicates the name of the freezer header table. + ChainFreezerHeaderTable = "headers" - // chainFreezerHashTable indicates the name of the freezer canonical hash table. - chainFreezerHashTable = "hashes" + // ChainFreezerHashTable indicates the name of the freezer canonical hash table. + ChainFreezerHashTable = "hashes" - // chainFreezerBodiesTable indicates the name of the freezer block body table. - chainFreezerBodiesTable = "bodies" + // ChainFreezerBodiesTable indicates the name of the freezer block body table. + ChainFreezerBodiesTable = "bodies" - // chainFreezerReceiptTable indicates the name of the freezer receipts table. - chainFreezerReceiptTable = "receipts" + // ChainFreezerReceiptTable indicates the name of the freezer receipts table. + ChainFreezerReceiptTable = "receipts" - // chainFreezerDifficultyTable indicates the name of the freezer total difficulty table. - chainFreezerDifficultyTable = "diffs" + // ChainFreezerDifficultyTable indicates the name of the freezer total difficulty table. + ChainFreezerDifficultyTable = "diffs" ) // chainFreezerNoSnappy configures whether compression is disabled for the ancient-tables. // Hashes and difficulties don't compress well. var chainFreezerNoSnappy = map[string]bool{ - chainFreezerHeaderTable: false, - chainFreezerHashTable: true, - chainFreezerBodiesTable: false, - chainFreezerReceiptTable: false, - chainFreezerDifficultyTable: true, + ChainFreezerHeaderTable: false, + ChainFreezerHashTable: true, + ChainFreezerBodiesTable: false, + ChainFreezerReceiptTable: false, + ChainFreezerDifficultyTable: true, } // The list of identifiers of ancient stores. diff --git a/core/rawdb/chain_freezer.go b/core/rawdb/chain_freezer.go index 738295cfb702..920a0ab59661 100644 --- a/core/rawdb/chain_freezer.go +++ b/core/rawdb/chain_freezer.go @@ -280,19 +280,19 @@ func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hash } // Write to the batch. - if err := op.AppendRaw(chainFreezerHashTable, number, hash[:]); err != nil { + if err := op.AppendRaw(ChainFreezerHashTable, number, hash[:]); err != nil { return fmt.Errorf("can't write hash to Freezer: %v", err) } - if err := op.AppendRaw(chainFreezerHeaderTable, number, header); err != nil { + if err := op.AppendRaw(ChainFreezerHeaderTable, number, header); err != nil { return fmt.Errorf("can't write header to Freezer: %v", err) } - if err := op.AppendRaw(chainFreezerBodiesTable, number, body); err != nil { + if err := op.AppendRaw(ChainFreezerBodiesTable, number, body); err != nil { return fmt.Errorf("can't write body to Freezer: %v", err) } - if err := op.AppendRaw(chainFreezerReceiptTable, number, receipts); err != nil { + if err := op.AppendRaw(ChainFreezerReceiptTable, number, receipts); err != nil { return fmt.Errorf("can't write receipts to Freezer: %v", err) } - if err := op.AppendRaw(chainFreezerDifficultyTable, number, td); err != nil { + if err := op.AppendRaw(ChainFreezerDifficultyTable, number, td); err != nil { return fmt.Errorf("can't write td to Freezer: %v", err) } diff --git a/core/rawdb/chain_iterator.go b/core/rawdb/chain_iterator.go index 85ad88e29172..102943516eff 100644 --- a/core/rawdb/chain_iterator.go +++ b/core/rawdb/chain_iterator.go @@ -50,7 +50,7 @@ func InitDatabaseFromFreezer(db ethdb.Database) { if i+count > frozen { count = frozen - i } - data, err := db.AncientRange(chainFreezerHashTable, i, count, 32*count) + data, err := db.AncientRange(ChainFreezerHashTable, i, count, 32*count) if err != nil { log.Crit("Failed to init database from freezer", "err", err) } diff --git a/core/rawdb/database.go b/core/rawdb/database.go index ef80c251a457..a27b45e4d7af 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -231,7 +231,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st // If the freezer already contains something, ensure that the genesis blocks // match, otherwise we might mix up freezers across chains and destroy both // the freezer and the key-value store. - frgenesis, err := frdb.Ancient(chainFreezerHashTable, 0) + frgenesis, err := frdb.Ancient(ChainFreezerHashTable, 0) if err != nil { return nil, fmt.Errorf("failed to retrieve genesis from ancient %v", err) } else if !bytes.Equal(kvgenesis, frgenesis) { diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index f7790b2d80f0..bb74efe754e7 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -154,6 +154,11 @@ type Downloader struct { bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) + + // Progress reporting metrics + syncStartBlock uint64 // Head snap block when Geth was started + syncStartTime time.Time // Time instance when chain sync started + syncLogTime time.Time // Time instance when status was last reported } // LightChain encapsulates functions required to synchronise a light chain. @@ -231,7 +236,9 @@ func New(checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain Bl quitCh: make(chan struct{}), SnapSyncer: snap.NewSyncer(stateDb, chain.TrieDB().Scheme()), stateSyncStart: make(chan *stateSync), + syncStartBlock: chain.CurrentFastBlock().NumberU64(), } + // Create the post-merge skeleton syncer and start the process dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success)) go dl.stateFetcher() @@ -1614,6 +1621,7 @@ func (d *Downloader) processSnapSyncContent() error { if len(results) == 0 { // If pivot sync is done, stop if oldPivot == nil { + d.reportSnapSyncProgress(true) return sync.Cancel() } // If sync failed, stop @@ -1627,6 +1635,8 @@ func (d *Downloader) processSnapSyncContent() error { if d.chainInsertHook != nil { d.chainInsertHook(results) } + d.reportSnapSyncProgress(false) + // If we haven't downloaded the pivot block yet, check pivot staleness // notifications from the header downloader d.pivotLock.RLock() @@ -1739,7 +1749,7 @@ func (d *Downloader) commitSnapSyncData(results []*fetchResult, stateSync *state } default: } - // Retrieve the a batch of results to import + // Retrieve the batch of results to import first, last := results[0].Header, results[len(results)-1].Header log.Debug("Inserting snap-sync blocks", "items", len(results), "firstnum", first.Number, "firsthash", first.Hash(), @@ -1820,3 +1830,56 @@ func (d *Downloader) readHeaderRange(last *types.Header, count int) []*types.Hea } return headers } + +// reportSnapSyncProgress calculates various status reports and provides it to the user. +func (d *Downloader) reportSnapSyncProgress(force bool) { + // Initialize the sync start time if it's the first time we're reporting + if d.syncStartTime.IsZero() { + d.syncStartTime = time.Now().Add(-time.Millisecond) // -1ms offset to avoid division by zero + } + // Don't report all the events, just occasionally + if !force && time.Since(d.syncLogTime) < 8*time.Second { + return + } + // Don't report anything until we have a meaningful progress + var ( + headerBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerHeaderTable) + bodyBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerBodiesTable) + receiptBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerReceiptTable) + ) + syncedBytes := common.StorageSize(headerBytes + bodyBytes + receiptBytes) + if syncedBytes == 0 { + return + } + var ( + header = d.blockchain.CurrentHeader() + block = d.blockchain.CurrentFastBlock() + ) + syncedBlocks := block.NumberU64() - d.syncStartBlock + if syncedBlocks == 0 { + return + } + // Retrieve the current chain head and calculate the ETA + latest, _, err := d.skeleton.Bounds() + if err != nil { + // We're going to cheat for non-merged networks, but that's fine + latest = d.pivotHeader + } + if latest == nil { + // This should really never happen, but add some defensive code for now. + // TODO(karalabe): Remove it eventually if we don't see it blow. + log.Error("Nil latest block in sync progress report") + return + } + var ( + left = latest.Number.Uint64() - block.NumberU64() + eta = time.Since(d.syncStartTime) / time.Duration(syncedBlocks) * time.Duration(left) + + progress = fmt.Sprintf("%.2f%%", float64(block.NumberU64())*100/float64(latest.Number.Uint64())) + headers = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(header.Number.Uint64()), common.StorageSize(headerBytes).TerminalString()) + bodies = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(block.NumberU64()), common.StorageSize(bodyBytes).TerminalString()) + receipts = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(block.NumberU64()), common.StorageSize(receiptBytes).TerminalString()) + ) + log.Info("Syncing: chain download in progress", "synced", progress, "chain", syncedBytes, "headers", headers, "bodies", bodies, "receipts", receipts, "eta", common.PrettyDuration(eta)) + d.syncLogTime = time.Now() +} diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 1f676e655031..5af5068c98cf 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -144,7 +144,7 @@ type queue struct { active *sync.Cond closed bool - lastStatLog time.Time + logTime time.Time // Time instance when status was last reported } // newQueue creates a new download queue for scheduling block retrieval. @@ -390,11 +390,12 @@ func (q *queue) Results(block bool) []*fetchResult { } } // Log some info at certain times - if time.Since(q.lastStatLog) > 60*time.Second { - q.lastStatLog = time.Now() + if time.Since(q.logTime) >= 60*time.Second { + q.logTime = time.Now() + info := q.Stats() info = append(info, "throttle", throttleThreshold) - log.Info("Downloader queue stats", info...) + log.Debug("Downloader queue stats", info...) } return results }