Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 76 additions & 28 deletions codegenerator/cli/npm/envio/src/Prometheus.res
Original file line number Diff line number Diff line change
Expand Up @@ -241,25 +241,6 @@ module BenchmarkCounters = {
}
}

module PartitionBlockFetched = {
type labels = {chainId: int, partitionId: string}

let labelSchema = S.schema(s => {
chainId: s.matches(S.string->S.coerce(S.int)),
partitionId: s.matches(S.string),
})

let counter = SafeGauge.makeOrThrow(
~name="partition_block_fetched",
~help="The latest fetched block number for each partition",
~labelSchema,
)

let set = (~blockNumber, ~partitionId, ~chainId) => {
counter->SafeGauge.handleInt(~labels={chainId, partitionId}, ~value=blockNumber)
}
}

let chainIdLabelsSchema = S.object(s => {
s.field("chainId", S.string->S.coerce(S.int))
})
Expand Down Expand Up @@ -440,22 +421,13 @@ module SourceGetHeightDuration = {
}

module ReorgCount = {
let deprecatedCounter = PromClient.Counter.makeCounter({
"name": "reorgs_detected",
"help": "Total number of reorgs detected",
"labelNames": ["chainId"],
})

let gauge = SafeGauge.makeOrThrow(
~name="envio_reorg_count",
~help="Total number of reorgs detected",
~labelSchema=chainIdLabelsSchema,
)

let increment = (~chain) => {
deprecatedCounter
->PromClient.Counter.labels({"chainId": chain->ChainMap.Chain.toString})
->PromClient.Counter.inc
gauge->SafeGauge.increment(~labels=chain->ChainMap.Chain.toChainId)
}
}
Expand Down Expand Up @@ -642,3 +614,79 @@ module EffectCacheCount = {
gauge->SafeGauge.handleInt(~labels=effectName, ~value=count)
}
}

module StorageLoad = {
let operationLabelsSchema = S.object(s => s.field("operation", S.string))

let timeCounter = SafeCounter.makeOrThrow(
~name="envio_storage_load_time",
~help="Processing time taken to load data from storage. (milliseconds)",
~labelSchema=operationLabelsSchema,
)

let totalTimeCounter = SafeCounter.makeOrThrow(
~name="envio_storage_load_total_time",
~help="Cumulative time spent loading data from storage during the indexing process. (milliseconds)",
~labelSchema=operationLabelsSchema,
)
Comment on lines +627 to +631
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this represent cumulative time? It doesn't look like it builds on a previous time. Not sure I fully understand

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always increment the counter with the amount of time every query spent.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry, I think I was confused. This is a counter and not a guage 👍🏼 got it


let counter = SafeCounter.makeOrThrow(
~name="envio_storage_load_count",
~help="Cumulative number of successful storage load operations during the indexing process.",
~labelSchema=operationLabelsSchema,
)

let whereSizeCounter = SafeCounter.makeOrThrow(
~name="envio_storage_load_where_size",
~help="Cumulative number of filter conditions ('where' items) used in storage load operations during the indexing process.",
~labelSchema=operationLabelsSchema,
)

let sizeCounter = SafeCounter.makeOrThrow(
~name="envio_storage_load_size",
~help="Cumulative number of records loaded from storage during the indexing process.",
~labelSchema=operationLabelsSchema,
)

type operationRef = {
mutable pendingCount: int,
timerRef: Hrtime.timeRef,
}
let operations = Js.Dict.empty()

let startOperation = (~operation) => {
switch operations->Utils.Dict.dangerouslyGetNonOption(operation) {
| Some(operationRef) => operationRef.pendingCount = operationRef.pendingCount + 1
| None =>
operations->Js.Dict.set(
operation,
(
{
pendingCount: 1,
timerRef: Hrtime.makeTimer(),
}: operationRef
),
)
}
Hrtime.makeTimer()
}
Comment on lines +657 to +672
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add a failure path to prevent pendingCount leaks and skewed envio_storage_load_time

When endOperation isn’t called (e.g., exceptions in the load path), operationRef.pendingCount never returns to 0. This prevents envio_storage_load_time from being emitted again for that label and leaves the operations entry alive indefinitely.

Introduce a failOperation that:

  • Decrements pendingCount
  • Emits totalTimeCounter and whereSizeCounter
  • Does NOT increment counter (success) or sizeCounter
  • Emits timeCounter and deletes the operations entry when pendingCount reaches 0

Apply this diff within the StorageLoad module:

   let endOperation = (timerRef, ~operation, ~whereSize, ~size) => {
@@
     sizeCounter->SafeCounter.handleInt(~labels={operation}, ~value=size)
   }
+
+  /* Failure path: ensure pendingCount is decremented and time is accounted for,
+     without incrementing success counters or size. */
+  let failOperation = (timerRef, ~operation, ~whereSize) => {
+    let operationRef = operations->Js.Dict.unsafeGet(operation)
+    operationRef.pendingCount = operationRef.pendingCount - 1
+    if operationRef.pendingCount === 0 {
+      timeCounter->SafeCounter.handleInt(
+        ~labels={operation},
+        ~value=operationRef.timerRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis,
+      )
+      operations->Utils.Dict.deleteInPlace(operation)
+    }
+    totalTimeCounter->SafeCounter.handleInt(
+      ~labels={operation},
+      ~value=timerRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis,
+    )
+    whereSizeCounter->SafeCounter.handleInt(~labels={operation}, ~value=whereSize)
+  }

Then call failOperation in the LoadLayer catch blocks before re-raising the error (see the other file’s comment for an example).

Also applies to: 674-691


let endOperation = (timerRef, ~operation, ~whereSize, ~size) => {
let operationRef = operations->Js.Dict.unsafeGet(operation)
operationRef.pendingCount = operationRef.pendingCount - 1
if operationRef.pendingCount === 0 {
timeCounter->SafeCounter.handleInt(
~labels={operation},
~value=operationRef.timerRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis,
)
operations->Utils.Dict.deleteInPlace(operation)
}
totalTimeCounter->SafeCounter.handleInt(
Comment on lines +677 to +684
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you by any chance swap totalTimeCounter and timeCounter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say we have 3 Entity.get queries running in parallel for 10ms

  • timeCounter - will increase by 10ms
  • totalTimeCounter - will increase by 30ms

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok, understood. So timeCounter is accounting for concurrency 👍🏼

~labels={operation},
~value=timerRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis,
)
counter->SafeCounter.increment(~labels={operation})
whereSizeCounter->SafeCounter.handleInt(~labels={operation}, ~value=whereSize)
sizeCounter->SafeCounter.handleInt(~labels={operation}, ~value=size)
}
}
20 changes: 20 additions & 0 deletions codegenerator/cli/templates/static/codegen/src/LoadLayer.res
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ let loadById = (
let inMemTable = inMemoryStore->InMemoryStore.getInMemTable(~entityConfig)

let load = async idsToLoad => {
let timerRef = Prometheus.StorageLoad.startOperation(~operation=key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

endOperation is skipped on exceptions → pendingCount leak and broken aggregated timing

The current pattern starts an operation timer but only calls endOperation on the success path. If an exception is thrown (e.g., StorageError), the Prometheus.StorageLoad.operations entry’s pendingCount is never decremented. This prevents envio_storage_load_time from ever being emitted again for that operation label and leaves a dangling timerRef in the dict.

Recommend adding a failure path that decrements pendingCount and records attempt duration (without incrementing the “successful count” or “size”), then invoking it in catch blocks.

Apply this diff to add a failure handler in Prometheus.StorageLoad (see Prometheus.res comment for details), then invoke it from both load paths’ catch blocks.

Prometheus.res (adds failOperation):

@@
 module StorageLoad = {
@@
   let endOperation = (timerRef, ~operation, ~whereSize, ~size) => {
     let operationRef = operations->Js.Dict.unsafeGet(operation)
     operationRef.pendingCount = operationRef.pendingCount - 1
     if operationRef.pendingCount === 0 {
       timeCounter->SafeCounter.handleInt(
         ~labels={operation},
         ~value=operationRef.timerRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis,
       )
       operations->Utils.Dict.deleteInPlace(operation)
     }
     totalTimeCounter->SafeCounter.handleInt(
       ~labels={operation},
       ~value=timerRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis,
     )
     counter->SafeCounter.increment(~labels={operation})
     whereSizeCounter->SafeCounter.handleInt(~labels={operation}, ~value=whereSize)
     sizeCounter->SafeCounter.handleInt(~labels={operation}, ~value=size)
   }
+
+  /* Decrement pendingCount and record attempt duration on failures without
+     incrementing success counters or size. Prevents pendingCount leaks. */
+  let failOperation = (timerRef, ~operation, ~whereSize) => {
+    let operationRef = operations->Js.Dict.unsafeGet(operation)
+    operationRef.pendingCount = operationRef.pendingCount - 1
+    if operationRef.pendingCount === 0 {
+      timeCounter->SafeCounter.handleInt(
+        ~labels={operation},
+        ~value=operationRef.timerRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis,
+      )
+      operations->Utils.Dict.deleteInPlace(operation)
+    }
+    totalTimeCounter->SafeCounter.handleInt(
+      ~labels={operation},
+      ~value=timerRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis,
+    )
+    whereSizeCounter->SafeCounter.handleInt(~labels={operation}, ~value=whereSize)
+  }
 }

Then, call failOperation inside the catch branches in this file before re-raising:

ReScript (illustrative change; outside the changed lines)

// loadById catch (Lines 26-29 block)
} catch {
| Persistence.StorageError({message, reason}) => {
    Prometheus.StorageLoad.failOperation(
      timerRef,
      ~operation=key,
      ~whereSize=idsToLoad->Array.length,
    )
    reason->ErrorHandling.mkLogAndRaise(~logger=eventItem->Logging.getEventLogger, ~msg=message)
}
}

// loadByField catch (Lines 202-215 block inside the async index => { ... })
} catch {
| Persistence.StorageError({message, reason}) => {
    Prometheus.StorageLoad.failOperation(
      timerRef,
      ~operation=key,
      ~whereSize=fieldValues->Array.length,
    )
    reason->ErrorHandling.mkLogAndRaise(
      ~logger=Logging.createChildFrom(
        ~logger=eventItem->Logging.getEventLogger,
        ~params={
          "operator": operatorCallName,
          "tableName": entityConfig.table.tableName,
          "fieldName": fieldName,
          "fieldValue": fieldValue,
        },
      ),
      ~msg=message,
    )
}
}

If you prefer not to expand the Prometheus API, the alternative is wrapping the whole load body in try/finally and calling endOperation in finally with size=0 on failures, but that would change the semantic of “successful operations” counter.

Also applies to: 47-51, 169-172, 226-226, 230-234

🤖 Prompt for AI Agents
In codegenerator/cli/templates/static/codegen/src/LoadLayer.res around line 16
(and also touching the catch blocks at ~26-29, ~169-172, ~202-215, ~226-234),
the timerRef created by Prometheus.StorageLoad.startOperation is not ended on
exceptions which leaks pendingCount and leaves a dangling timer; update the
failure paths to call Prometheus.StorageLoad.failOperation(timerRef,
~operation=key, ~whereSize=<number of items attempted>) inside each StorageError
catch before re-raising (use idsToLoad->Array.length for loadById and
fieldValues->Array.length for loadByField), and ensure the Prometheus.res change
adding failOperation is applied so these calls compile; alternatively wrap the
whole load body in try/finally and call endOperation with size=0 on failure if
you prefer not to add failOperation.


// Since LoadManager.call prevents registerign entities already existing in the inMemoryStore,
// we can be sure that we load only the new ones.
let dbEntities = try {
Expand Down Expand Up @@ -41,6 +43,12 @@ let loadById = (
~entity=entitiesMap->Utils.Dict.dangerouslyGetNonOption(entityId),
)
})

timerRef->Prometheus.StorageLoad.endOperation(
~operation=key,
~whereSize=idsToLoad->Array.length,
~size=dbEntities->Array.length,
)
}

loadManager->LoadManager.call(
Expand Down Expand Up @@ -158,6 +166,10 @@ let loadByField = (
let inMemTable = inMemoryStore->InMemoryStore.getInMemTable(~entityConfig)

let load = async (fieldValues: array<'fieldValue>) => {
let timerRef = Prometheus.StorageLoad.startOperation(~operation=key)

let size = ref(0)

let indiciesToLoad = fieldValues->Js.Array2.map((fieldValue): TableIndices.Index.t => {
Single({
fieldName,
Expand Down Expand Up @@ -210,8 +222,16 @@ let loadByField = (
~entity=Some(entity),
)
})

size := size.contents + entities->Array.length
})
->Promise.all

timerRef->Prometheus.StorageLoad.endOperation(
~operation=key,
~whereSize=fieldValues->Array.length,
~size=size.contents,
)
}

loadManager->LoadManager.call(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,16 +383,6 @@ let validatePartitionQueryResponse = (
}

if Env.Benchmark.shouldSaveData {
switch query.target {
| Merge(_) => ()
| Head
| EndBlock(_) =>
Prometheus.PartitionBlockFetched.set(
~blockNumber=latestFetchedBlockNumber,
~partitionId=query.partitionId,
~chainId=chain->ChainMap.Chain.toChainId,
)
}
Benchmark.addBlockRangeFetched(
~totalTimeElapsed=stats.totalTimeElapsed,
~parsingTimeElapsed=stats.parsingTimeElapsed->Belt.Option.getWithDefault(0),
Expand Down