Skip to content

Commit 0723a81

Browse files
committed
Add metrics for loading data from storage
1 parent a021fd1 commit 0723a81

File tree

3 files changed

+96
-38
lines changed

3 files changed

+96
-38
lines changed

codegenerator/cli/npm/envio/src/Prometheus.res

Lines changed: 76 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -241,25 +241,6 @@ module BenchmarkCounters = {
241241
}
242242
}
243243

244-
module PartitionBlockFetched = {
245-
type labels = {chainId: int, partitionId: string}
246-
247-
let labelSchema = S.schema(s => {
248-
chainId: s.matches(S.string->S.coerce(S.int)),
249-
partitionId: s.matches(S.string),
250-
})
251-
252-
let counter = SafeGauge.makeOrThrow(
253-
~name="partition_block_fetched",
254-
~help="The latest fetched block number for each partition",
255-
~labelSchema,
256-
)
257-
258-
let set = (~blockNumber, ~partitionId, ~chainId) => {
259-
counter->SafeGauge.handleInt(~labels={chainId, partitionId}, ~value=blockNumber)
260-
}
261-
}
262-
263244
let chainIdLabelsSchema = S.object(s => {
264245
s.field("chainId", S.string->S.coerce(S.int))
265246
})
@@ -440,22 +421,13 @@ module SourceGetHeightDuration = {
440421
}
441422

442423
module ReorgCount = {
443-
let deprecatedCounter = PromClient.Counter.makeCounter({
444-
"name": "reorgs_detected",
445-
"help": "Total number of reorgs detected",
446-
"labelNames": ["chainId"],
447-
})
448-
449424
let gauge = SafeGauge.makeOrThrow(
450425
~name="envio_reorg_count",
451426
~help="Total number of reorgs detected",
452427
~labelSchema=chainIdLabelsSchema,
453428
)
454429

455430
let increment = (~chain) => {
456-
deprecatedCounter
457-
->PromClient.Counter.labels({"chainId": chain->ChainMap.Chain.toString})
458-
->PromClient.Counter.inc
459431
gauge->SafeGauge.increment(~labels=chain->ChainMap.Chain.toChainId)
460432
}
461433
}
@@ -642,3 +614,79 @@ module EffectCacheCount = {
642614
gauge->SafeGauge.handleInt(~labels=effectName, ~value=count)
643615
}
644616
}
617+
618+
module StorageLoad = {
619+
let operationLabelsSchema = S.object(s => s.field("operation", S.string))
620+
621+
let timeCounter = SafeCounter.makeOrThrow(
622+
~name="envio_storage_load_time",
623+
~help="Processing time taken to load data from storage. (milliseconds)",
624+
~labelSchema=operationLabelsSchema,
625+
)
626+
627+
let totalTimeCounter = SafeCounter.makeOrThrow(
628+
~name="envio_storage_load_total_time",
629+
~help="Cumulative time spent loading data from storage during the indexing process. (milliseconds)",
630+
~labelSchema=operationLabelsSchema,
631+
)
632+
633+
let counter = SafeCounter.makeOrThrow(
634+
~name="envio_storage_load_count",
635+
~help="Cumulative number of successful storage load operations during the indexing process.",
636+
~labelSchema=operationLabelsSchema,
637+
)
638+
639+
let whereSizeCounter = SafeCounter.makeOrThrow(
640+
~name="envio_storage_load_where_size",
641+
~help="Cumulative number of filter conditions ('where' items) used in storage load operations during the indexing process.",
642+
~labelSchema=operationLabelsSchema,
643+
)
644+
645+
let sizeCounter = SafeCounter.makeOrThrow(
646+
~name="envio_storage_load_size",
647+
~help="Cumulative number of records loaded from storage during the indexing process.",
648+
~labelSchema=operationLabelsSchema,
649+
)
650+
651+
type operationRef = {
652+
mutable pendingCount: int,
653+
timerRef: Hrtime.timeRef,
654+
}
655+
let operations = Js.Dict.empty()
656+
657+
let startOperation = (~operation) => {
658+
switch operations->Utils.Dict.dangerouslyGetNonOption(operation) {
659+
| Some(operationRef) => operationRef.pendingCount = operationRef.pendingCount + 1
660+
| None =>
661+
operations->Js.Dict.set(
662+
operation,
663+
(
664+
{
665+
pendingCount: 1,
666+
timerRef: Hrtime.makeTimer(),
667+
}: operationRef
668+
),
669+
)
670+
}
671+
Hrtime.makeTimer()
672+
}
673+
674+
let endOperation = (timerRef, ~operation, ~whereSize, ~size) => {
675+
let operationRef = operations->Js.Dict.unsafeGet(operation)
676+
operationRef.pendingCount = operationRef.pendingCount - 1
677+
if operationRef.pendingCount === 0 {
678+
timeCounter->SafeCounter.handleInt(
679+
~labels={operation},
680+
~value=operationRef.timerRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis,
681+
)
682+
operations->Utils.Dict.deleteInPlace(operation)
683+
}
684+
totalTimeCounter->SafeCounter.handleInt(
685+
~labels={operation},
686+
~value=timerRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis,
687+
)
688+
counter->SafeCounter.increment(~labels={operation})
689+
whereSizeCounter->SafeCounter.handleInt(~labels={operation}, ~value=whereSize)
690+
sizeCounter->SafeCounter.handleInt(~labels={operation}, ~value=size)
691+
}
692+
}

codegenerator/cli/templates/static/codegen/src/LoadLayer.res

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ let loadById = (
1313
let inMemTable = inMemoryStore->InMemoryStore.getInMemTable(~entityConfig)
1414

1515
let load = async idsToLoad => {
16+
let timerRef = Prometheus.StorageLoad.startOperation(~operation=key)
17+
1618
// Since LoadManager.call prevents registerign entities already existing in the inMemoryStore,
1719
// we can be sure that we load only the new ones.
1820
let dbEntities = try {
@@ -41,6 +43,12 @@ let loadById = (
4143
~entity=entitiesMap->Utils.Dict.dangerouslyGetNonOption(entityId),
4244
)
4345
})
46+
47+
timerRef->Prometheus.StorageLoad.endOperation(
48+
~operation=key,
49+
~whereSize=idsToLoad->Array.length,
50+
~size=dbEntities->Array.length,
51+
)
4452
}
4553

4654
loadManager->LoadManager.call(
@@ -158,6 +166,10 @@ let loadByField = (
158166
let inMemTable = inMemoryStore->InMemoryStore.getInMemTable(~entityConfig)
159167

160168
let load = async (fieldValues: array<'fieldValue>) => {
169+
let timerRef = Prometheus.StorageLoad.startOperation(~operation=key)
170+
171+
let size = ref(0)
172+
161173
let indiciesToLoad = fieldValues->Js.Array2.map((fieldValue): TableIndices.Index.t => {
162174
Single({
163175
fieldName,
@@ -210,8 +222,16 @@ let loadByField = (
210222
~entity=Some(entity),
211223
)
212224
})
225+
226+
size := size.contents + entities->Array.length
213227
})
214228
->Promise.all
229+
230+
timerRef->Prometheus.StorageLoad.endOperation(
231+
~operation=key,
232+
~whereSize=fieldValues->Array.length,
233+
~size=size.contents,
234+
)
215235
}
216236

217237
loadManager->LoadManager.call(

codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -383,16 +383,6 @@ let validatePartitionQueryResponse = (
383383
}
384384

385385
if Env.Benchmark.shouldSaveData {
386-
switch query.target {
387-
| Merge(_) => ()
388-
| Head
389-
| EndBlock(_) =>
390-
Prometheus.PartitionBlockFetched.set(
391-
~blockNumber=latestFetchedBlockNumber,
392-
~partitionId=query.partitionId,
393-
~chainId=chain->ChainMap.Chain.toChainId,
394-
)
395-
}
396386
Benchmark.addBlockRangeFetched(
397387
~totalTimeElapsed=stats.totalTimeElapsed,
398388
~parsingTimeElapsed=stats.parsingTimeElapsed->Belt.Option.getWithDefault(0),

0 commit comments

Comments
 (0)