Skip to content

Commit 818c7ca

Browse files
authored
Indexing performance metrics (#545)
* Add prom metrics to measure quering performance * Update source height metric using query response * Sync chain list
1 parent ea41c42 commit 818c7ca

File tree

4 files changed

+76
-3
lines changed

4 files changed

+76
-3
lines changed

codegenerator/cli/npm/envio/src/Prometheus.res

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ module MakeSafePromMetric = (
8383
let handleInt: (t<'a>, ~labels: 'a, ~value: int) => unit
8484
let handleFloat: (t<'a>, ~labels: 'a, ~value: float) => unit
8585
let increment: (t<'a>, ~labels: 'a) => unit
86+
let incrementMany: (t<'a>, ~labels: 'a, ~value: int) => unit
8687
} => {
8788
type t<'a> = {metric: M.t, labelSchema: S.t<'a>}
8889

@@ -121,6 +122,13 @@ module MakeSafePromMetric = (
121122
->M.labels(labels->S.reverseConvertToJsonOrThrow(labelSchema))
122123
->Obj.magic
123124
)["inc"]()
125+
126+
let incrementMany = ({metric, labelSchema}: t<'a>, ~labels: 'a, ~value) =>
127+
(
128+
metric
129+
->M.labels(labels->S.reverseConvertToJsonOrThrow(labelSchema))
130+
->Obj.magic
131+
)["inc"](value)
124132
}
125133

126134
module SafeCounter = MakeSafePromMetric({
@@ -356,6 +364,30 @@ module IndexingPartitions = {
356364
}
357365
}
358366

367+
module IndexingIdleTime = {
368+
let counter = SafeCounter.makeOrThrow(
369+
~name="envio_indexing_idle_time",
370+
~help="The number of milliseconds the indexer source syncing has been idle. A high value may indicate the source sync is a bottleneck.",
371+
~labelSchema=chainIdLabelsSchema,
372+
)
373+
}
374+
375+
module IndexingSourceWaitingTime = {
376+
let counter = SafeCounter.makeOrThrow(
377+
~name="envio_indexing_source_waiting_time",
378+
~help="The number of milliseconds the indexer has been waiting for new blocks.",
379+
~labelSchema=chainIdLabelsSchema,
380+
)
381+
}
382+
383+
module IndexingQueryTime = {
384+
let counter = SafeCounter.makeOrThrow(
385+
~name="envio_indexing_query_time",
386+
~help="The number of milliseconds spent performing queries to the chain data-source.",
387+
~labelSchema=chainIdLabelsSchema,
388+
)
389+
}
390+
359391
module IndexingBufferSize = {
360392
let gauge = SafeGauge.makeOrThrow(
361393
~name="envio_indexing_buffer_size",

codegenerator/cli/npm/envio/src/sources/SourceManager.res

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
open Belt
22

3+
type sourceManagerStatus = Idle | WaitingForNewBlock | Querieng
4+
35
// Ideally the ChainFetcher name suits this better
46
// But currently the ChainFetcher module is immutable
57
// and handles both processing and fetching.
68
// So this module is to encapsulate the fetching logic only
79
// with a mutable state for easier reasoning and testing.
810
type t = {
911
sources: Utils.Set.t<Source.t>,
12+
mutable statusStart: Hrtime.timeRef,
13+
mutable status: sourceManagerStatus,
1014
maxPartitionConcurrency: int,
1115
newBlockFallbackStallTimeout: int,
1216
stalledPollingInterval: int,
@@ -66,7 +70,23 @@ let make = (
6670
newBlockFallbackStallTimeout,
6771
stalledPollingInterval,
6872
getHeightRetryInterval,
73+
statusStart: Hrtime.makeTimer(),
74+
status: Idle,
75+
}
76+
}
77+
78+
let trackNewStatus = (sourceManager: t, ~newStatus) => {
79+
let promCounter = switch newStatus {
80+
| Idle => Prometheus.IndexingIdleTime.counter
81+
| WaitingForNewBlock => Prometheus.IndexingSourceWaitingTime.counter
82+
| Querieng => Prometheus.IndexingQueryTime.counter
6983
}
84+
promCounter->Prometheus.SafeCounter.incrementMany(
85+
~labels=sourceManager.activeSource.chain->ChainMap.Chain.toChainId,
86+
~value=sourceManager.statusStart->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis,
87+
)
88+
sourceManager.statusStart = Hrtime.makeTimer()
89+
sourceManager.status = newStatus
7090
}
7191

7292
let fetchNext = async (
@@ -96,10 +116,12 @@ let fetchNext = async (
96116
| Some(waitingStateId) if waitingStateId >= stateId => ()
97117
| Some(_) // Case for the prev state before a rollback
98118
| None =>
119+
sourceManager->trackNewStatus(~newStatus=WaitingForNewBlock)
99120
sourceManager.waitingForNewBlockStateId = Some(stateId)
100121
let currentBlockHeight = await waitForNewBlock(~currentBlockHeight)
101122
switch sourceManager.waitingForNewBlockStateId {
102123
| Some(waitingStateId) if waitingStateId === stateId => {
124+
sourceManager->trackNewStatus(~newStatus=Idle)
103125
sourceManager.waitingForNewBlockStateId = None
104126
onNewBlock(~currentBlockHeight)
105127
}
@@ -115,6 +137,7 @@ let fetchNext = async (
115137
~concurrency=sourceManager.fetchingPartitionsCount,
116138
~chainId=sourceManager.activeSource.chain->ChainMap.Chain.toChainId,
117139
)
140+
sourceManager->trackNewStatus(~newStatus=Querieng)
118141
let _ =
119142
await queries
120143
->Array.map(q => {
@@ -125,6 +148,9 @@ let fetchNext = async (
125148
~concurrency=sourceManager.fetchingPartitionsCount,
126149
~chainId=sourceManager.activeSource.chain->ChainMap.Chain.toChainId,
127150
)
151+
if sourceManager.fetchingPartitionsCount === 0 {
152+
sourceManager->trackNewStatus(~newStatus=Idle)
153+
}
128154
})
129155
promise
130156
})

codegenerator/cli/src/config_parsing/chain_helpers.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ pub enum Network {
250250
)]
251251
MoonbaseAlpha = 1287,
252252

253-
#[subenum(NetworkWithExplorer, GraphNetwork)]
253+
#[subenum(HypersyncNetwork, NetworkWithExplorer, GraphNetwork)]
254254
Moonbeam = 1284,
255255

256256
#[subenum(GraphNetwork, NetworkWithExplorer)]
@@ -340,6 +340,9 @@ pub enum Network {
340340
#[subenum(HypersyncNetwork)]
341341
Superseed = 5330,
342342

343+
#[subenum(HypersyncNetwork)]
344+
Swell = 1923,
345+
343346
#[subenum(NetworkWithExplorer)]
344347
Taiko = 167000,
345348

@@ -522,7 +525,8 @@ impl Network {
522525
| Network::MegaethTestnet
523526
| Network::Curtis
524527
| Network::Worldchain
525-
| Network::Sonic => DEFAULT_CONFIRMED_BLOCK_THRESHOLD,
528+
| Network::Sonic
529+
| Network::Swell => DEFAULT_CONFIRMED_BLOCK_THRESHOLD,
526530
}
527531
}
528532
}
@@ -582,7 +586,7 @@ impl HypersyncNetwork {
582586
| ArbitrumSepolia | Fraxtal | Soneium | BaseSepolia | MevCommit | Merlin | Mode
583587
| MoonbaseAlpha | XdcTestnet | Morph | Harmony | Saakuru | Cyber | Superseed
584588
| MegaethTestnet | Sonic | Worldchain | Sophon | Fantom | Sepolia | Rsk | Chiliz
585-
| Lisk | Hyperliquid => Stone,
589+
| Lisk | Hyperliquid | Swell | Moonbeam => Stone,
586590
}
587591
}
588592

codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,17 @@ let validatePartitionQueryResponse = (
343343
} = response
344344
let {rangeLastBlock} = reorgGuard
345345

346+
if currentBlockHeight > chainFetcher.currentBlockHeight {
347+
Prometheus.SourceHeight.set(
348+
~blockNumber=currentBlockHeight,
349+
~chainId=chainFetcher.chainConfig.chain->ChainMap.Chain.toChainId,
350+
// The currentBlockHeight from response won't necessarily
351+
// belong to the currently active source.
352+
// But for simplicity, assume it does.
353+
~sourceName=(chainFetcher.sourceManager->SourceManager.getActiveSource).name,
354+
)
355+
}
356+
346357
if Env.Benchmark.shouldSaveData {
347358
switch query.target {
348359
| Merge(_) => ()

0 commit comments

Comments
 (0)