Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[devnet] separare logging - headers download #7551

Merged
merged 2 commits into from
May 20, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[devnet] separare logging - headers download
  • Loading branch information
Alex Sharp authored and Alex Sharp committed May 19, 2023
commit a203a70f8dd08e7bbe7790eb0b747061c0551c25
1 change: 1 addition & 0 deletions cmd/sentry/sentry/sentry_multi_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func NewMultiClient(
1024*1024, /* linkLimit */
engine,
blockReader,
logger,
)
if chainConfig.TerminalTotalDifficultyPassed {
hd.SetPOSSync(true)
Expand Down
20 changes: 12 additions & 8 deletions ethdb/privateapi/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,16 @@ func makeTestDb(ctx context.Context, db kv.RwDB) {
}

func TestMockDownloadRequest(t *testing.T) {
logger := log.New()
db := memdb.NewTestDB(t)
ctx := context.Background()
require := require.New(t)

makeTestDb(ctx, db)
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil)
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil, logger)
hd.SetPOSSync(true)
events := shards.NewEvents()
backend := NewEthBackendServer(ctx, nil, db, events, nil, &chain.Config{TerminalTotalDifficulty: libcommon.Big1}, nil, hd, false, log.New())
backend := NewEthBackendServer(ctx, nil, db, events, nil, &chain.Config{TerminalTotalDifficulty: libcommon.Big1}, nil, hd, false, logger)

var err error
var reply *remote.EnginePayloadStatus
Expand Down Expand Up @@ -145,17 +146,18 @@ func TestMockDownloadRequest(t *testing.T) {
}

func TestMockValidExecution(t *testing.T) {
logger := log.New()
db := memdb.NewTestDB(t)
ctx := context.Background()
require := require.New(t)

makeTestDb(ctx, db)

hd := headerdownload.NewHeaderDownload(0, 0, nil, nil)
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil, logger)
hd.SetPOSSync(true)

events := shards.NewEvents()
backend := NewEthBackendServer(ctx, nil, db, events, nil, &chain.Config{TerminalTotalDifficulty: libcommon.Big1}, nil, hd, false, log.New())
backend := NewEthBackendServer(ctx, nil, db, events, nil, &chain.Config{TerminalTotalDifficulty: libcommon.Big1}, nil, hd, false, logger)

var err error
var reply *remote.EnginePayloadStatus
Expand All @@ -181,17 +183,18 @@ func TestMockValidExecution(t *testing.T) {
}

func TestMockInvalidExecution(t *testing.T) {
logger := log.New()
db := memdb.NewTestDB(t)
ctx := context.Background()
require := require.New(t)

makeTestDb(ctx, db)

hd := headerdownload.NewHeaderDownload(0, 0, nil, nil)
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil, logger)
hd.SetPOSSync(true)

events := shards.NewEvents()
backend := NewEthBackendServer(ctx, nil, db, events, nil, &chain.Config{TerminalTotalDifficulty: libcommon.Big1}, nil, hd, false, log.New())
backend := NewEthBackendServer(ctx, nil, db, events, nil, &chain.Config{TerminalTotalDifficulty: libcommon.Big1}, nil, hd, false, logger)

var err error
var reply *remote.EnginePayloadStatus
Expand All @@ -217,16 +220,17 @@ func TestMockInvalidExecution(t *testing.T) {
}

func TestNoTTD(t *testing.T) {
logger := log.New()
db := memdb.NewTestDB(t)
ctx := context.Background()
require := require.New(t)

makeTestDb(ctx, db)

hd := headerdownload.NewHeaderDownload(0, 0, nil, nil)
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil, logger)

events := shards.NewEvents()
backend := NewEthBackendServer(ctx, nil, db, events, nil, &chain.Config{}, nil, hd, false, log.New())
backend := NewEthBackendServer(ctx, nil, db, events, nil, &chain.Config{}, nil, hd, false, logger)

var err error

Expand Down
52 changes: 26 additions & 26 deletions turbo/stages/headerdownload/header_algos.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (hd *HeaderDownload) SingleHeaderAsSegment(headerRaw []byte, header *types.

headerHash := types.RawRlpHash(headerRaw)
if _, bad := hd.badHeaders[headerHash]; bad {
log.Warn("[downloader] Rejected header marked as bad", "hash", headerHash, "height", header.Number.Uint64())
hd.logger.Warn("[downloader] Rejected header marked as bad", "hash", headerHash, "height", header.Number.Uint64())
return nil, BadBlockPenalty, nil
}
if penalizePoSBlocks && header.Difficulty.Sign() == 0 {
Expand Down Expand Up @@ -300,7 +300,7 @@ func (hd *HeaderDownload) logAnchorState() {
ss = append(ss, sb.String())
}
sort.Strings(ss)
log.Debug("[downloader] Queue sizes", "anchors", hd.anchorTree.Len(), "links", hd.linkQueue.Len(), "persisted", hd.persistedLinkQueue.Len())
hd.logger.Debug("[downloader] Queue sizes", "anchors", hd.anchorTree.Len(), "links", hd.linkQueue.Len(), "persisted", hd.persistedLinkQueue.Len())
for _, s := range ss {
log.Debug(s)
}
Expand Down Expand Up @@ -350,7 +350,7 @@ func (hd *HeaderDownload) RecoverFromDb(db kv.RoDB) error {

select {
case <-logEvery.C:
log.Info("[downloader] recover headers from db", "left", hd.persistedLinkLimit-hd.persistedLinkQueue.Len())
hd.logger.Info("[downloader] recover headers from db", "left", hd.persistedLinkLimit-hd.persistedLinkQueue.Len())
default:
}
}
Expand All @@ -377,7 +377,7 @@ func (hd *HeaderDownload) ReadProgressFromDb(tx kv.RwTx) (err error) {
}

func (hd *HeaderDownload) invalidateAnchor(anchor *Anchor, reason string) {
log.Debug("[downloader] Invalidating anchor", "height", anchor.blockHeight, "hash", anchor.parentHash, "reason", reason)
hd.logger.Debug("[downloader] Invalidating anchor", "height", anchor.blockHeight, "hash", anchor.parentHash, "reason", reason)
hd.removeAnchor(anchor)
for child := anchor.fLink; child != nil; child, child.next = child.next, nil {
hd.removeUpwards(child)
Expand Down Expand Up @@ -415,7 +415,7 @@ func (hd *HeaderDownload) RequestMoreHeaders(currentTime time.Time) (*HeaderRequ
func (hd *HeaderDownload) requestMoreHeadersForPOS(currentTime time.Time) (timeout bool, request *HeaderRequest, penalties []PenaltyItem) {
anchor := hd.posAnchor
if anchor == nil {
log.Debug("[downloader] No PoS anchor")
hd.logger.Debug("[downloader] No PoS anchor")
return
}

Expand All @@ -427,7 +427,7 @@ func (hd *HeaderDownload) requestMoreHeadersForPOS(currentTime time.Time) (timeo
// TODO: [pos-downloader-tweaks] - we could reduce this number, or config it
timeout = anchor.timeouts >= 3
if timeout {
log.Warn("[downloader] Timeout", "requestId", hd.requestId, "peerID", common.Bytes2Hex(anchor.peerID[:]))
hd.logger.Warn("[downloader] Timeout", "requestId", hd.requestId, "peerID", common.Bytes2Hex(anchor.peerID[:]))
penalties = []PenaltyItem{{Penalty: AbandonedAnchorPenalty, PeerID: anchor.peerID}}
return
}
Expand Down Expand Up @@ -480,7 +480,7 @@ func (hd *HeaderDownload) UpdateRetryTime(req *HeaderRequest, currentTime time.T
func (hd *HeaderDownload) RequestSkeleton() *HeaderRequest {
hd.lock.RLock()
defer hd.lock.RUnlock()
log.Debug("[downloader] Request skeleton", "anchors", len(hd.anchors), "highestInDb", hd.highestInDb)
hd.logger.Debug("[downloader] Request skeleton", "anchors", len(hd.anchors), "highestInDb", hd.highestInDb)
var stride uint64
if hd.initialCycle {
stride = 192
Expand Down Expand Up @@ -519,18 +519,18 @@ func (hd *HeaderDownload) InsertHeader(hf FeedHeaderFunc, terminalTotalDifficult
hd.moveLinkToQueue(link, NoQueue)
delete(hd.links, link.hash)
hd.removeUpwards(link)
log.Warn("[downloader] Rejected header marked as bad", "hash", link.hash, "height", link.blockHeight)
hd.logger.Warn("[downloader] Rejected header marked as bad", "hash", link.hash, "height", link.blockHeight)
return true, false, 0, lastTime, nil
}
if !link.verified {
if err := hd.VerifyHeader(link.header); err != nil {
hd.badPoSHeaders[link.hash] = link.header.ParentHash
if errors.Is(err, consensus.ErrFutureBlock) {
// This may become valid later
log.Warn("[downloader] Added future link", "hash", link.hash, "height", link.blockHeight, "timestamp", link.header.Time)
hd.logger.Warn("[downloader] Added future link", "hash", link.hash, "height", link.blockHeight, "timestamp", link.header.Time)
return false, false, 0, lastTime, nil // prevent removal of the link from the hd.linkQueue
} else {
log.Debug("[downloader] Verification failed for header", "hash", link.hash, "height", link.blockHeight, "err", err)
hd.logger.Debug("[downloader] Verification failed for header", "hash", link.hash, "height", link.blockHeight, "err", err)
hd.moveLinkToQueue(link, NoQueue)
delete(hd.links, link.hash)
hd.removeUpwards(link)
Expand All @@ -542,7 +542,7 @@ func (hd *HeaderDownload) InsertHeader(hf FeedHeaderFunc, terminalTotalDifficult
// Make sure long insertions do not appear as a stuck stage 1
select {
case <-logChannel:
log.Info(fmt.Sprintf("[%s] Inserting headers", logPrefix), "progress", hd.highestInDb, "queue", hd.insertQueue.Len())
hd.logger.Info(fmt.Sprintf("[%s] Inserting headers", logPrefix), "progress", hd.highestInDb, "queue", hd.insertQueue.Len())
default:
}
td, err := hf(link.header, link.headerRaw, link.hash, link.blockHeight)
Expand All @@ -569,7 +569,7 @@ func (hd *HeaderDownload) InsertHeader(hf FeedHeaderFunc, terminalTotalDifficult

if link.blockHeight > hd.highestInDb {
if hd.trace {
log.Info("[downloader] Highest in DB change", "number", link.blockHeight, "hash", link.hash)
hd.logger.Info("[downloader] Highest in DB change", "number", link.blockHeight, "hash", link.hash)
}
hd.highestInDb = link.blockHeight
}
Expand Down Expand Up @@ -623,7 +623,7 @@ func (hd *HeaderDownload) InsertHeaders(hf FeedHeaderFunc, terminalTotalDifficul
}
}
if blocksToTTD > 0 {
log.Info("Estimated to reaching TTD", "blocks", blocksToTTD)
hd.logger.Info("Estimated to reaching TTD", "blocks", blocksToTTD)
}
hd.lock.RLock()
defer hd.lock.RUnlock()
Expand All @@ -636,7 +636,7 @@ func (hd *HeaderDownload) SetHeaderToDownloadPoS(hash libcommon.Hash, height uin
hd.lock.Lock()
defer hd.lock.Unlock()

log.Debug("[downloader] Set posAnchor", "blockHeight", height+1)
hd.logger.Debug("[downloader] Set posAnchor", "blockHeight", height+1)
hd.posAnchor = &Anchor{
parentHash: hash,
blockHeight: height + 1,
Expand All @@ -647,12 +647,12 @@ func (hd *HeaderDownload) ProcessHeadersPOS(csHeaders []ChainSegmentHeader, tx k
if len(csHeaders) == 0 {
return nil, nil
}
log.Debug("[downloader] Collecting...", "from", csHeaders[0].Number, "to", csHeaders[len(csHeaders)-1].Number, "len", len(csHeaders))
hd.logger.Debug("[downloader] Collecting...", "from", csHeaders[0].Number, "to", csHeaders[len(csHeaders)-1].Number, "len", len(csHeaders))
hd.lock.Lock()
defer hd.lock.Unlock()
if hd.posAnchor == nil {
// May happen if peers are sending unrequested header packets after we've synced
log.Debug("[downloader] posAnchor is nil")
hd.logger.Debug("[downloader] posAnchor is nil")
return nil, nil
}

Expand All @@ -670,11 +670,11 @@ func (hd *HeaderDownload) ProcessHeadersPOS(csHeaders []ChainSegmentHeader, tx k
// With this code commented out, the sync proceeds but very slowly (getting 1 header from the response of 192 headers)
/*
if hd.posAnchor.blockHeight != 1 && sh.Number != hd.posAnchor.blockHeight-1 {
log.Debug("[downloader] posAnchor", "blockHeight", hd.posAnchor.blockHeight)
hd.logger.Debug("[downloader] posAnchor", "blockHeight", hd.posAnchor.blockHeight)
//return nil, nil
}
*/
log.Debug("[downloader] Unexpected header", "hash", headerHash, "expected", hd.posAnchor.parentHash, "peerID", common.Bytes2Hex(peerId[:]))
hd.logger.Debug("[downloader] Unexpected header", "hash", headerHash, "expected", hd.posAnchor.parentHash, "peerID", common.Bytes2Hex(peerId[:]))
// Not penalise because we might have sent request twice
continue
}
Expand All @@ -689,7 +689,7 @@ func (hd *HeaderDownload) ProcessHeadersPOS(csHeaders []ChainSegmentHeader, tx k
return nil, err
}
if hh != nil {
log.Debug("[downloader] Synced", "requestId", hd.requestId)
hd.logger.Debug("[downloader] Synced", "requestId", hd.requestId)
if headerNumber != hh.Number.Uint64()+1 {
hd.badPoSHeaders[headerHash] = header.ParentHash
return nil, fmt.Errorf("invalid PoS segment detected: invalid block number. got %d, expected %d", headerNumber, hh.Number.Uint64()+1)
Expand Down Expand Up @@ -967,11 +967,11 @@ func (hd *HeaderDownload) ProcessHeader(sh ChainSegmentHeader, newBlock bool, pe
anchor, foundAnchor := hd.anchors[sh.Hash]
if !foundParent && !foundAnchor {
if sh.Number < hd.highestInDb {
log.Debug(fmt.Sprintf("[downloader] new anchor too far in the past: %d, latest header in db: %d", sh.Number, hd.highestInDb))
hd.logger.Debug(fmt.Sprintf("[downloader] new anchor too far in the past: %d, latest header in db: %d", sh.Number, hd.highestInDb))
return false
}
if len(hd.anchors) >= hd.anchorLimit {
log.Debug(fmt.Sprintf("[downloader] too many anchors: %d, limit %d", len(hd.anchors), hd.anchorLimit))
hd.logger.Debug(fmt.Sprintf("[downloader] too many anchors: %d, limit %d", len(hd.anchors), hd.anchorLimit))
return false
}
}
Expand Down Expand Up @@ -999,7 +999,7 @@ func (hd *HeaderDownload) ProcessHeader(sh ChainSegmentHeader, newBlock bool, pe
} else {
// The link has not known parent, therefore it becomes an anchor, unless it is too far in the past
if sh.Number+params.FullImmutabilityThreshold < hd.highestInDb {
log.Debug("[downloader] Remove upwards", "height", link.blockHeight, "hash", link.blockHeight)
hd.logger.Debug("[downloader] Remove upwards", "height", link.blockHeight, "hash", link.blockHeight)
hd.removeUpwards(link)
return false
}
Expand Down Expand Up @@ -1028,9 +1028,9 @@ func (hd *HeaderDownload) ProcessHeaders(csHeaders []ChainSegmentHeader, newBloc
hd.lock.Lock()
defer hd.lock.Unlock()
hd.stats.Responses++
log.Trace("[downloader] Link queue", "size", hd.linkQueue.Len())
hd.logger.Trace("[downloader] Link queue", "size", hd.linkQueue.Len())
if hd.linkQueue.Len() > hd.linkLimit {
log.Trace("[downloader] Too many links, cutting down", "count", hd.linkQueue.Len(), "tried to add", len(csHeaders), "limit", hd.linkLimit)
hd.logger.Trace("[downloader] Too many links, cutting down", "count", hd.linkQueue.Len(), "tried to add", len(csHeaders), "limit", hd.linkLimit)
hd.pruneLinkQueue()
}
// Wake up stage loop if it is outside any of the stages
Expand Down Expand Up @@ -1289,7 +1289,7 @@ func (hd *HeaderDownload) StartPoSDownloader(
if sentToPeer {
// If request was actually sent to a peer, we update retry time to be 5 seconds in the future
hd.UpdateRetryTime(req, currentTime, 30*time.Second /* timeout */)
log.Debug("[downloader] Sent request", "height", req.Number)
hd.logger.Debug("[downloader] Sent request", "height", req.Number)
}
}
if len(penalties) > 0 {
Expand All @@ -1312,7 +1312,7 @@ func (hd *HeaderDownload) StartPoSDownloader(
prevProgress = progress
} else if progress <= prevProgress {
diff := prevProgress - progress
log.Info("[downloader] Downloaded PoS Headers", "now", progress,
hd.logger.Info("[downloader] Downloaded PoS Headers", "now", progress,
"blk/sec", float64(diff)/float64(logInterval/time.Second))
prevProgress = progress
}
Expand Down
4 changes: 4 additions & 0 deletions turbo/stages/headerdownload/header_data_struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/engineapi"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/log/v3"
)

type QueueID uint8
Expand Down Expand Up @@ -270,6 +271,7 @@ type HeaderDownload struct {
unsettledHeadHeight uint64 // Height of unsettledForkChoice.headBlockHash
posDownloaderTip common.Hash // See https://hackmd.io/GDc0maGsQeKfP8o2C7L52w
badPoSHeaders map[common.Hash]common.Hash // Invalid Tip -> Last Valid Ancestor
logger log.Logger
}

// HeaderRecord encapsulates two forms of the same header - raw RLP encoding (to avoid duplicated decodings and encodings), and parsed value types.Header
Expand All @@ -283,6 +285,7 @@ func NewHeaderDownload(
linkLimit int,
engine consensus.Engine,
headerReader services.HeaderAndCanonicalReader,
logger log.Logger,
) *HeaderDownload {
persistentLinkLimit := linkLimit / 16
hd := &HeaderDownload{
Expand All @@ -303,6 +306,7 @@ func NewHeaderDownload(
ShutdownCh: make(chan struct{}),
headerReader: headerReader,
badPoSHeaders: make(map[common.Hash]common.Hash),
logger: logger,
}
heap.Init(&hd.persistedLinkQueue)
heap.Init(&hd.linkQueue)
Expand Down