Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

<!-- New features or capabilities -->
- Added parallel block retrieval system for DA operations, achieving up to 5x improvement in sync performance ([#381](https://github.com/evstack/ev-node/issues/381))
- Implemented concurrent worker pool (5 workers by default) for parallel DA height processing
- Added intelligent prefetching that retrieves up to 50 heights ahead
- Introduced concurrent namespace fetching for headers and data
- Added comprehensive metrics for monitoring parallel retrieval performance
- Added gRPC execution client implementation for remote execution services using Connect-RPC protocol ([#2490](https://github.com/evstack/ev-node/pull/2490))
- Added `ExecutorService` protobuf definition with InitChain, GetTxs, ExecuteTxs, and SetFinal RPCs ([#2490](https://github.com/evstack/ev-node/pull/2490))
- Added new `grpc` app for running EVNode with a remote execution layer via gRPC ([#2490](https://github.com/evstack/ev-node/pull/2490))
Expand Down
96 changes: 68 additions & 28 deletions block/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ type Metrics struct {
// State transition metrics
StateTransitions map[string]metrics.Counter
InvalidTransitions metrics.Counter

// Parallel retrieval metrics
ParallelRetrievalWorkers metrics.Gauge
ParallelRetrievalBufferSize metrics.Gauge
ParallelRetrievalPendingJobs metrics.Gauge
ParallelRetrievalLatency metrics.Histogram
}

// PrometheusMetrics returns Metrics built using Prometheus client library
Expand Down Expand Up @@ -349,6 +355,36 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
}, labels).With(labelsAndValues...)
}

// Parallel retrieval metrics
m.ParallelRetrievalWorkers = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "parallel_retrieval_workers",
Help: "Number of active parallel retrieval workers",
}, labels).With(labelsAndValues...)

m.ParallelRetrievalBufferSize = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "parallel_retrieval_buffer_size",
Help: "Current size of the parallel retrieval result buffer",
}, labels).With(labelsAndValues...)

m.ParallelRetrievalPendingJobs = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "parallel_retrieval_pending_jobs",
Help: "Number of pending parallel retrieval jobs",
}, labels).With(labelsAndValues...)

m.ParallelRetrievalLatency = prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "parallel_retrieval_latency_seconds",
Help: "Latency of parallel retrieval operations",
Buckets: []float64{.01, .05, .1, .25, .5, 1, 2.5, 5, 10, 30},
}, labels).With(labelsAndValues...)

return m
}

Expand All @@ -363,34 +399,38 @@ func NopMetrics() *Metrics {
CommittedHeight: discard.NewGauge(),

// Extended metrics
ChannelBufferUsage: make(map[string]metrics.Gauge),
ErrorsByType: make(map[string]metrics.Counter),
OperationDuration: make(map[string]metrics.Histogram),
StateTransitions: make(map[string]metrics.Counter),
DroppedSignals: discard.NewCounter(),
RecoverableErrors: discard.NewCounter(),
NonRecoverableErrors: discard.NewCounter(),
GoroutineCount: discard.NewGauge(),
DASubmissionAttempts: discard.NewCounter(),
DASubmissionSuccesses: discard.NewCounter(),
DASubmissionFailures: discard.NewCounter(),
DARetrievalAttempts: discard.NewCounter(),
DARetrievalSuccesses: discard.NewCounter(),
DARetrievalFailures: discard.NewCounter(),
DAInclusionHeight: discard.NewGauge(),
PendingHeadersCount: discard.NewGauge(),
PendingDataCount: discard.NewGauge(),
SyncLag: discard.NewGauge(),
HeadersSynced: discard.NewCounter(),
DataSynced: discard.NewCounter(),
BlocksApplied: discard.NewCounter(),
InvalidHeadersCount: discard.NewCounter(),
BlockProductionTime: discard.NewHistogram(),
EmptyBlocksProduced: discard.NewCounter(),
LazyBlocksProduced: discard.NewCounter(),
NormalBlocksProduced: discard.NewCounter(),
TxsPerBlock: discard.NewHistogram(),
InvalidTransitions: discard.NewCounter(),
ChannelBufferUsage: make(map[string]metrics.Gauge),
ErrorsByType: make(map[string]metrics.Counter),
OperationDuration: make(map[string]metrics.Histogram),
StateTransitions: make(map[string]metrics.Counter),
DroppedSignals: discard.NewCounter(),
RecoverableErrors: discard.NewCounter(),
NonRecoverableErrors: discard.NewCounter(),
GoroutineCount: discard.NewGauge(),
DASubmissionAttempts: discard.NewCounter(),
DASubmissionSuccesses: discard.NewCounter(),
DASubmissionFailures: discard.NewCounter(),
DARetrievalAttempts: discard.NewCounter(),
DARetrievalSuccesses: discard.NewCounter(),
DARetrievalFailures: discard.NewCounter(),
DAInclusionHeight: discard.NewGauge(),
PendingHeadersCount: discard.NewGauge(),
PendingDataCount: discard.NewGauge(),
SyncLag: discard.NewGauge(),
HeadersSynced: discard.NewCounter(),
DataSynced: discard.NewCounter(),
BlocksApplied: discard.NewCounter(),
InvalidHeadersCount: discard.NewCounter(),
BlockProductionTime: discard.NewHistogram(),
EmptyBlocksProduced: discard.NewCounter(),
LazyBlocksProduced: discard.NewCounter(),
NormalBlocksProduced: discard.NewCounter(),
TxsPerBlock: discard.NewHistogram(),
InvalidTransitions: discard.NewCounter(),
ParallelRetrievalWorkers: discard.NewGauge(),
ParallelRetrievalBufferSize: discard.NewGauge(),
ParallelRetrievalPendingJobs: discard.NewGauge(),
ParallelRetrievalLatency: discard.NewHistogram(),
}

// Initialize maps with no-op metrics
Expand Down
Loading
Loading