Skip to content

Commit

Permalink
core, miner: add debug logs to track chain insertion (#1166)
Browse files Browse the repository at this point in the history
* core: add debug logs for sync issue

* add more logs

* worker: add debug logs

* worker: add debug logs

* core: add more logfs

* miner: fix typo
  • Loading branch information
manav2401 authored Feb 23, 2024
1 parent cf460dc commit 7d39405
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 0 deletions.
36 changes: 36 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1765,6 +1765,7 @@ func (bc *BlockChain) writeBlockAndSetHead(ctx context.Context, block *types.Blo
if block.ParentHash() != currentBlock.Hash() {
if err = bc.reorg(currentBlock, block); err != nil {
status = NonStatTy
log.Info("blockchain.writeBlockAndSetHead: reorg failed", "err", err)
}
}

Expand Down Expand Up @@ -1868,6 +1869,8 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {

defer bc.blockProcFeed.Send(false)

log.Info("[sync debug] blockchain.InsertChain", "len", len(chain), "first", chain[0].Number().Uint64(), "last", chain[len(chain)-1].Number().Uint64())

// Do a sanity check that the provided chain is actually ordered and linked.
for i := 1; i < len(chain); i++ {
block, prev := chain[i], chain[i-1]
Expand All @@ -1888,6 +1891,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
if !bc.chainmu.TryLock() {
return 0, errChainStopped
}
defer log.Info("[sync debug] blockchain.InsertChain: exiting and unlocking chain mutex")
defer bc.chainmu.Unlock()
return bc.insertChain(chain, true)
}
Expand All @@ -1906,6 +1910,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
return 0, nil
}

log.Info("[sync debug] blockchain.insertChain", "setHead", setHead)

// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
SenderCacher.RecoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number(), chain[0].Time()), chain)

Expand All @@ -1917,6 +1923,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
defer func() {
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
bc.chainHeadFeed.Send(ChainHeadEvent{lastCanon})
log.Info("[sync debug] blockchain.insertChain: fired ChainHeadEvent", "number", lastCanon.Number(), "hash", lastCanon.Hash())
}
}()
// Start the parallel header verifier
Expand All @@ -1931,13 +1938,20 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
it := newInsertIterator(chain, results, bc.validator)
block, err := it.next()

log.Info("[sync debug] blockchain.insertChain: verify header and created itetator", "err", err)

// Update the block import meter; it will just record chains we've received
// from other peers. (Note that the actual chain which gets imported would be
// quite low).
blockImportTimer.Mark(int64(len(headers)))

// Check the validity of incoming chain
isValid, err1 := bc.forker.ValidateReorg(bc.CurrentBlock(), headers)
if len(headers) == 0 {
log.Info("[sync debug] blockchain.insertChain: validate reorg, empty headers", "current", bc.CurrentBlock().Number.Uint64(), "valid", isValid, "err", err1)
} else {
log.Info("[sync debug] blockchain.insertChain: validate reorg", "current", bc.CurrentBlock().Number.Uint64(), "incoming first", headers[0].Number.Uint64(), "incoming last", headers[len(headers)-1].Number.Uint64(), "valid", isValid, "err", err1)
}
if err1 != nil {
return it.index, err1
}
Expand All @@ -1962,6 +1976,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)

for block != nil && bc.skipBlock(err, it) {
reorg, err = bc.forker.ReorgNeeded(current, block.Header())
log.Info("[sync debug] blockchain.insertChain: reorg needed", "current", current.Number.Uint64(), "incoming", block.Header().Number.Uint64(), "reorg", reorg, "err", err)
if err != nil {
return it.index, err
}
Expand All @@ -1973,6 +1988,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
// the external consensus engine), but in order to prevent the unnecessary
// reorgs when importing known blocks, the special case is handled here.
if block.NumberU64() > current.Number.Uint64() || bc.GetCanonicalHash(block.NumberU64()) != block.Hash() {
log.Info("[sync debug] blockchain.insertChain: reorg needed, switch to import mode", "number", block.Number(), "hash", block.Hash(), "current", current.Number.Uint64())
break
}
}
Expand Down Expand Up @@ -2005,6 +2021,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
// Falls through to the block import
}

log.Info("[sync debug] blockchain.insertChain: left trimmed the chain", "err", err)

switch {
// First block is pruned
case errors.Is(err, consensus.ErrPrunedAncestor):
Expand Down Expand Up @@ -2041,6 +2059,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
// ErrKnownBlock is allowed here since some known blocks
// still need re-execution to generate snapshots that are missing
case err != nil && !errors.Is(err, ErrKnownBlock):
log.Debug("Aborting block processing", "err", err)
bc.futureBlocks.Remove(block.Hash())

stats.ignored += len(it.chain)
Expand All @@ -2049,6 +2068,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)

return it.index, err
}

log.Info("[sync debug] blockchain.insertChain: done acting upon error")

// No validation errors for the first block (or chain prefix skipped)
var activeState *state.StateDB
defer func() {
Expand Down Expand Up @@ -2080,6 +2102,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
}

for ; block != nil && err == nil || errors.Is(err, ErrKnownBlock); block, err = it.next() {
log.Info("[sync debug] blockchain.insertChain: processing block", "number", block.Number().Uint64(), "hash", block.Hash(), "err", err)

// If the chain is terminating, stop processing blocks
if bc.insertStopped() {
log.Debug("Abort during block processing")
Expand Down Expand Up @@ -2125,6 +2149,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
return it.index, err
}

log.Info("[sync debug] blockchain.insertChain: writing known block", "number", block.Number(), "hash", block.Hash())

stats.processed++

// We can assume that logs are empty here, since the only way for consecutive
Expand Down Expand Up @@ -2167,6 +2193,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
receipts, logs, usedGas, statedb, err := bc.ProcessBlock(block, parent)
activeState = statedb

log.Info("[sync debug] blockchain.insertChain: done processing block", "number", block.Number().Uint64(), "hash", block.Hash(), "err", err)

if err != nil {
bc.reportBlock(block, receipts, err)
followupInterrupt.Store(true)
Expand All @@ -2190,6 +2218,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
return it.index, err
}

log.Info("[sync debug]blockchain.insertChain: done validating state", "number", block.Number().Uint64(), "hash", block.Hash(), "err", err)

vtime := time.Since(vstart)
proctime := time.Since(start) // processing + validation

Expand Down Expand Up @@ -2222,6 +2252,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
status, err = bc.writeBlockAndSetHead(context.Background(), block, receipts, logs, statedb, false)
}

log.Info("[sync debug] blockchain.insertChain: done writing block", "number", block.Number().Uint64(), "hash", block.Hash(), "status", status, "err", err)

followupInterrupt.Store(true)

if err != nil {
Expand Down Expand Up @@ -2287,6 +2319,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
}
}

log.Info("[sync debug] blockchain.insertChain: done processing all blocks", "err", err)

// BOR
emitAccum()
// BOR
Expand All @@ -2310,6 +2344,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)

stats.ignored += it.remaining()

log.Info("[sync debug] blockchain.insertChain: exiting", "index", it.index, "err", err)

return it.index, err
}

Expand Down
14 changes: 14 additions & 0 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ func (w *worker) newWorkLoop(ctx context.Context, recommit time.Duration) {
interrupt.Store(s)
}

log.Info("[sync debug] worker.newWorkLoop: about to commit", "len", len(w.newWorkCh))
interrupt = new(atomic.Int32)
select {
case w.newWorkCh <- &newWorkReq{interrupt: interrupt, timestamp: timestamp, ctx: ctx, noempty: noempty}:
Expand All @@ -505,6 +506,7 @@ func (w *worker) newWorkLoop(ctx context.Context, recommit time.Duration) {
}
timer.Reset(recommit)
w.newTxs.Store(0)
log.Info("[sync debug] worker.newWorkLoop: done commit")
}
// clearPending cleans the stale pending tasks.
clearPending := func(number uint64) {
Expand All @@ -529,10 +531,12 @@ func (w *worker) newWorkLoop(ctx context.Context, recommit time.Duration) {
commit(false, commitInterruptNewHead)

case head := <-w.chainHeadCh:
log.Info("[sync debug] worker.newWorkLoop: received new head in chain head channel", "head", head.Block.Number().Uint64(), "hash", head.Block.Hash())
clearPending(head.Block.NumberU64())

timestamp = time.Now().Unix()
commit(false, commitInterruptNewHead)
log.Info("[sync debug] worker.newWorkLoop: done committing", "head", head.Block.Number().Uint64(), "hash", head.Block.Hash())

case <-timer.C:
// If sealing is running resubmit a new work cycle periodically to pull in
Expand Down Expand Up @@ -561,6 +565,7 @@ func (w *worker) newWorkLoop(ctx context.Context, recommit time.Duration) {
}

case adjust := <-w.resubmitAdjustCh:
log.Info("[sync debug] worker.newWorkLoop: received resubmit adjustment", "len", len(w.resubmitAdjustCh))
// Adjust resubmit interval by feedback.
if adjust.inc {
before := recommit
Expand Down Expand Up @@ -600,6 +605,7 @@ func (w *worker) mainLoop(ctx context.Context) {
for {
select {
case req := <-w.newWorkCh:
log.Info("[sync debug] worker.mainLoop: received new work request", "len", len(w.newWorkCh))
if w.chainConfig.ChainID.Cmp(params.BorMainnetChainConfig.ChainID) == 0 || w.chainConfig.ChainID.Cmp(params.MumbaiChainConfig.ChainID) == 0 {
if w.eth.PeerCount() > 0 {
//nolint:contextcheck
Expand All @@ -609,6 +615,7 @@ func (w *worker) mainLoop(ctx context.Context) {
//nolint:contextcheck
w.commitWork(req.ctx, req.interrupt, req.noempty, req.timestamp)
}
log.Info("[sync debug] worker.mainLoop: done acting upon new work request")

case req := <-w.getWorkCh:
block, fees, err := w.generateWork(req.ctx, req.params)
Expand Down Expand Up @@ -1566,6 +1573,8 @@ func (w *worker) commitWork(ctx context.Context, interrupt *atomic.Int32, noempt
return
}

log.Info("[sync debug] worker.commitWork: done preparing work", "number", work.header.Number.Uint64())

// nolint:contextcheck
var interruptCtx = context.Background()

Expand Down Expand Up @@ -1597,10 +1606,13 @@ func (w *worker) commitWork(ctx context.Context, interrupt *atomic.Int32, noempt
// Fill pending transactions from the txpool into the block.
err = w.fillTransactions(ctx, interrupt, work, interruptCtx)

log.Info("[sync debug] worker.commitWork: fill transactions completed", "err", err)

switch {
case err == nil:
// The entire block is filled, decrease resubmit interval in case
// of current interval is larger than the user-specified one.
log.Info("[sync debug] worker.commitWork: sending to resubmitAdjustCh", "len", len(w.resubmitAdjustCh))
w.resubmitAdjustCh <- &intervalAdjust{inc: false}

case errors.Is(err, errBlockInterruptedByRecommit):
Expand Down Expand Up @@ -1634,6 +1646,8 @@ func (w *worker) commitWork(ctx context.Context, interrupt *atomic.Int32, noempt
w.current.discard()
}

log.Info("[sync debug] worker.commitWork: exiting from commitWork", "number", work.header.Number.Uint64())

w.current = work
}

Expand Down

0 comments on commit 7d39405

Please sign in to comment.