Skip to content

Commit

Permalink
dvovk/nsync (#9306)
Browse files Browse the repository at this point in the history
  • Loading branch information
dvovk authored Jan 25, 2024
1 parent e2b524b commit 9c8a2a5
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 4 deletions.
25 changes: 25 additions & 0 deletions diagnostics/diagnostic.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (d *DiagnosticClient) Setup() {
d.runSegmentIndexingFinishedListener()
d.runCurrentSyncStageListener()
d.runSyncStagesListListener()
d.runBlockExecutionListener()
}

func (d *DiagnosticClient) runSnapshotListener() {
Expand Down Expand Up @@ -219,3 +220,27 @@ func (d *DiagnosticClient) runCurrentSyncStageListener() {
}
}()
}

func (d *DiagnosticClient) runBlockExecutionListener() {
go func() {
ctx, ch, cancel := diaglib.Context[diaglib.BlockExecutionStatistics](context.Background(), 1)
defer cancel()

rootCtx, _ := common.RootContext()

diaglib.StartProviders(ctx, diaglib.TypeOf(diaglib.BlockExecutionStatistics{}), log.Root())
for {
select {
case <-rootCtx.Done():
cancel()
return
case info := <-ch:
d.syncStats.BlockExecution = info

if int(d.syncStats.SyncStages.CurrentStage) >= len(d.syncStats.SyncStages.StagesList) {
return
}
}
}
}()
}
19 changes: 19 additions & 0 deletions erigon-lib/diagnostics/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type SyncStatistics struct {
SyncStages SyncStages `json:"syncStages"`
SnapshotDownload SnapshotDownloadStatistics `json:"snapshotDownload"`
SnapshotIndexing SnapshotIndexingStatistics `json:"snapshotIndexing"`
BlockExecution BlockExecutionStatistics `json:"blockExecution"`
}

type SnapshotDownloadStatistics struct {
Expand Down Expand Up @@ -90,6 +91,24 @@ type SyncStages struct {
CurrentStage uint `json:"currentStage"`
}

type BlockExecutionStatistics struct {
From uint64 `json:"from"`
To uint64 `json:"to"`
BlockNumber uint64 `json:"blockNumber"`
BlkPerSec float64 `json:"blkPerSec"`
TxPerSec float64 `json:"txPerSec"`
MgasPerSec float64 `json:"mgasPerSec"`
GasState float64 `json:"gasState"`
Batch uint64 `json:"batch"`
Alloc uint64 `json:"alloc"`
Sys uint64 `json:"sys"`
TimeElapsed float64 `json:"timeElapsed"`
}

func (ti BlockExecutionStatistics) Type() Type {
return TypeOf(ti)
}

func (ti SnapshotDownloadStatistics) Type() Type {
return TypeOf(ti)
}
Expand Down
27 changes: 24 additions & 3 deletions eth/stagedsync/stage_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/erigon-lib/common/hexutility"
"github.com/ledgerwatch/erigon-lib/common/length"
"github.com/ledgerwatch/erigon-lib/diagnostics"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/dbutils"
Expand Down Expand Up @@ -408,6 +409,7 @@ func SpawnExecuteBlocksStage(s *StageState, u Unwinder, txc wrap.TxContainer, to
logBlock := stageProgress
logTx, lastLogTx := uint64(0), uint64(0)
logTime := time.Now()
startTime := time.Now()
var gas uint64 // used for logs
var currentStateGas uint64 // used for batch commits of state
// Transform batch_size limit into Ggas
Expand Down Expand Up @@ -534,7 +536,7 @@ Loop:
select {
default:
case <-logEvery.C:
logBlock, logTx, logTime = logProgress(logPrefix, logBlock, logTime, blockNum, logTx, lastLogTx, gas, float64(currentStateGas)/float64(gasState), batch, logger)
logBlock, logTx, logTime = logProgress(logPrefix, logBlock, logTime, blockNum, logTx, lastLogTx, gas, float64(currentStateGas)/float64(gasState), batch, logger, s.BlockNumber, to, startTime)
gas = 0
txc.Tx.CollectMetrics()
syncMetrics[stages.Execution].SetUint64(blockNum)
Expand Down Expand Up @@ -650,7 +652,7 @@ func blocksReadAheadFunc(ctx context.Context, tx kv.Tx, cfg *ExecuteBlockCfg, bl
}

func logProgress(logPrefix string, prevBlock uint64, prevTime time.Time, currentBlock uint64, prevTx, currentTx uint64, gas uint64,
gasState float64, batch kv.PendingMutations, logger log.Logger) (uint64, uint64, time.Time) {
gasState float64, batch kv.PendingMutations, logger log.Logger, from uint64, to uint64, startTime time.Time) (uint64, uint64, time.Time) {
currentTime := time.Now()
interval := currentTime.Sub(prevTime)
speed := float64(currentBlock-prevBlock) / (float64(interval) / float64(time.Second))
Expand All @@ -666,10 +668,29 @@ func logProgress(logPrefix string, prevBlock uint64, prevTime time.Time, current
"Mgas/s", fmt.Sprintf("%.1f", speedMgas),
"gasState", fmt.Sprintf("%.2f", gasState),
}

batchSize := 0

if batch != nil {
logpairs = append(logpairs, "batch", common.ByteCount(uint64(batch.BatchSize())))
batchSize = batch.BatchSize()
logpairs = append(logpairs, "batch", common.ByteCount(uint64(batchSize)))
}
logpairs = append(logpairs, "alloc", common.ByteCount(m.Alloc), "sys", common.ByteCount(m.Sys))

diagnostics.Send(diagnostics.BlockExecutionStatistics{
From: from,
To: to,
BlockNumber: currentBlock,
BlkPerSec: speed,
TxPerSec: speedTx,
MgasPerSec: speedMgas,
GasState: gasState,
Batch: uint64(batchSize),
Alloc: m.Alloc,
Sys: m.Sys,
TimeElapsed: time.Since(startTime).Round(time.Second).Seconds(),
})

logger.Info(fmt.Sprintf("[%s] Executed blocks", logPrefix), logpairs...)

return currentBlock, currentTx, currentTime
Expand Down
1 change: 0 additions & 1 deletion eth/stagedsync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func (s *Sync) NextStage() {
return
}
s.currentStage++

isDiagEnabled := diagnostics.TypeOf(diagnostics.CurrentSyncStage{}).Enabled()
if isDiagEnabled {
diagnostics.Send(diagnostics.CurrentSyncStage{Stage: s.currentStage})
Expand Down

0 comments on commit 9c8a2a5

Please sign in to comment.