- 
                Notifications
    You must be signed in to change notification settings 
- Fork 28
Add metrics for loading data from storage #696
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -241,25 +241,6 @@ module BenchmarkCounters = { | |
| } | ||
| } | ||
|  | ||
| module PartitionBlockFetched = { | ||
| type labels = {chainId: int, partitionId: string} | ||
|  | ||
| let labelSchema = S.schema(s => { | ||
| chainId: s.matches(S.string->S.coerce(S.int)), | ||
| partitionId: s.matches(S.string), | ||
| }) | ||
|  | ||
| let counter = SafeGauge.makeOrThrow( | ||
| ~name="partition_block_fetched", | ||
| ~help="The latest fetched block number for each partition", | ||
| ~labelSchema, | ||
| ) | ||
|  | ||
| let set = (~blockNumber, ~partitionId, ~chainId) => { | ||
| counter->SafeGauge.handleInt(~labels={chainId, partitionId}, ~value=blockNumber) | ||
| } | ||
| } | ||
|  | ||
| let chainIdLabelsSchema = S.object(s => { | ||
| s.field("chainId", S.string->S.coerce(S.int)) | ||
| }) | ||
|  | @@ -440,22 +421,13 @@ module SourceGetHeightDuration = { | |
| } | ||
|  | ||
| module ReorgCount = { | ||
| let deprecatedCounter = PromClient.Counter.makeCounter({ | ||
| "name": "reorgs_detected", | ||
| "help": "Total number of reorgs detected", | ||
| "labelNames": ["chainId"], | ||
| }) | ||
|  | ||
| let gauge = SafeGauge.makeOrThrow( | ||
| ~name="envio_reorg_count", | ||
| ~help="Total number of reorgs detected", | ||
| ~labelSchema=chainIdLabelsSchema, | ||
| ) | ||
|  | ||
| let increment = (~chain) => { | ||
| deprecatedCounter | ||
| ->PromClient.Counter.labels({"chainId": chain->ChainMap.Chain.toString}) | ||
| ->PromClient.Counter.inc | ||
| gauge->SafeGauge.increment(~labels=chain->ChainMap.Chain.toChainId) | ||
| } | ||
| } | ||
|  | @@ -642,3 +614,79 @@ module EffectCacheCount = { | |
| gauge->SafeGauge.handleInt(~labels=effectName, ~value=count) | ||
| } | ||
| } | ||
|  | ||
| module StorageLoad = { | ||
| let operationLabelsSchema = S.object(s => s.field("operation", S.string)) | ||
|  | ||
| let timeCounter = SafeCounter.makeOrThrow( | ||
| ~name="envio_storage_load_time", | ||
| ~help="Processing time taken to load data from storage. (milliseconds)", | ||
| ~labelSchema=operationLabelsSchema, | ||
| ) | ||
|  | ||
| let totalTimeCounter = SafeCounter.makeOrThrow( | ||
| ~name="envio_storage_load_total_time", | ||
| ~help="Cumulative time spent loading data from storage during the indexing process. (milliseconds)", | ||
| ~labelSchema=operationLabelsSchema, | ||
| ) | ||
|  | ||
| let counter = SafeCounter.makeOrThrow( | ||
| ~name="envio_storage_load_count", | ||
| ~help="Cumulative number of successful storage load operations during the indexing process.", | ||
| ~labelSchema=operationLabelsSchema, | ||
| ) | ||
|  | ||
| let whereSizeCounter = SafeCounter.makeOrThrow( | ||
| ~name="envio_storage_load_where_size", | ||
| ~help="Cumulative number of filter conditions ('where' items) used in storage load operations during the indexing process.", | ||
| ~labelSchema=operationLabelsSchema, | ||
| ) | ||
|  | ||
| let sizeCounter = SafeCounter.makeOrThrow( | ||
| ~name="envio_storage_load_size", | ||
| ~help="Cumulative number of records loaded from storage during the indexing process.", | ||
| ~labelSchema=operationLabelsSchema, | ||
| ) | ||
|  | ||
| type operationRef = { | ||
| mutable pendingCount: int, | ||
| timerRef: Hrtime.timeRef, | ||
| } | ||
| let operations = Js.Dict.empty() | ||
|  | ||
| let startOperation = (~operation) => { | ||
| switch operations->Utils.Dict.dangerouslyGetNonOption(operation) { | ||
| | Some(operationRef) => operationRef.pendingCount = operationRef.pendingCount + 1 | ||
| | None => | ||
| operations->Js.Dict.set( | ||
| operation, | ||
| ( | ||
| { | ||
| pendingCount: 1, | ||
| timerRef: Hrtime.makeTimer(), | ||
| }: operationRef | ||
| ), | ||
| ) | ||
| } | ||
| Hrtime.makeTimer() | ||
| } | ||
| 
      Comment on lines
    
      +657
     to 
      +672
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add a failure path to prevent pendingCount leaks and skewed envio_storage_load_time When endOperation isn’t called (e.g., exceptions in the load path), operationRef.pendingCount never returns to 0. This prevents envio_storage_load_time from being emitted again for that label and leaves the operations entry alive indefinitely. Introduce a failOperation that: 
 Apply this diff within the StorageLoad module:    let endOperation = (timerRef, ~operation, ~whereSize, ~size) => {
@@
     sizeCounter->SafeCounter.handleInt(~labels={operation}, ~value=size)
   }
+
+  /* Failure path: ensure pendingCount is decremented and time is accounted for,
+     without incrementing success counters or size. */
+  let failOperation = (timerRef, ~operation, ~whereSize) => {
+    let operationRef = operations->Js.Dict.unsafeGet(operation)
+    operationRef.pendingCount = operationRef.pendingCount - 1
+    if operationRef.pendingCount === 0 {
+      timeCounter->SafeCounter.handleInt(
+        ~labels={operation},
+        ~value=operationRef.timerRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis,
+      )
+      operations->Utils.Dict.deleteInPlace(operation)
+    }
+    totalTimeCounter->SafeCounter.handleInt(
+      ~labels={operation},
+      ~value=timerRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis,
+    )
+    whereSizeCounter->SafeCounter.handleInt(~labels={operation}, ~value=whereSize)
+  }Then call failOperation in the LoadLayer catch blocks before re-raising the error (see the other file’s comment for an example). Also applies to: 674-691 | ||
|  | ||
| let endOperation = (timerRef, ~operation, ~whereSize, ~size) => { | ||
| let operationRef = operations->Js.Dict.unsafeGet(operation) | ||
| operationRef.pendingCount = operationRef.pendingCount - 1 | ||
| if operationRef.pendingCount === 0 { | ||
| timeCounter->SafeCounter.handleInt( | ||
| ~labels={operation}, | ||
| ~value=operationRef.timerRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis, | ||
| ) | ||
| operations->Utils.Dict.deleteInPlace(operation) | ||
| } | ||
| totalTimeCounter->SafeCounter.handleInt( | ||
| 
      Comment on lines
    
      +677
     to 
      +684
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you by any chance swap totalTimeCounter and timeCounter? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's say we have 3 Entity.get queries running in parallel for 10ms 
 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah ok, understood. So timeCounter is accounting for concurrency 👍🏼 | ||
| ~labels={operation}, | ||
| ~value=timerRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis, | ||
| ) | ||
| counter->SafeCounter.increment(~labels={operation}) | ||
| whereSizeCounter->SafeCounter.handleInt(~labels={operation}, ~value=whereSize) | ||
| sizeCounter->SafeCounter.handleInt(~labels={operation}, ~value=size) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -13,6 +13,8 @@ let loadById = ( | |
| let inMemTable = inMemoryStore->InMemoryStore.getInMemTable(~entityConfig) | ||
|  | ||
| let load = async idsToLoad => { | ||
| let timerRef = Prometheus.StorageLoad.startOperation(~operation=key) | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion endOperation is skipped on exceptions → pendingCount leak and broken aggregated timing The current pattern starts an operation timer but only calls endOperation on the success path. If an exception is thrown (e.g., StorageError), the Prometheus.StorageLoad.operations entry’s pendingCount is never decremented. This prevents envio_storage_load_time from ever being emitted again for that operation label and leaves a dangling timerRef in the dict. Recommend adding a failure path that decrements pendingCount and records attempt duration (without incrementing the “successful count” or “size”), then invoking it in catch blocks. Apply this diff to add a failure handler in Prometheus.StorageLoad (see Prometheus.res comment for details), then invoke it from both load paths’ catch blocks. Prometheus.res (adds failOperation): @@
 module StorageLoad = {
@@
   let endOperation = (timerRef, ~operation, ~whereSize, ~size) => {
     let operationRef = operations->Js.Dict.unsafeGet(operation)
     operationRef.pendingCount = operationRef.pendingCount - 1
     if operationRef.pendingCount === 0 {
       timeCounter->SafeCounter.handleInt(
         ~labels={operation},
         ~value=operationRef.timerRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis,
       )
       operations->Utils.Dict.deleteInPlace(operation)
     }
     totalTimeCounter->SafeCounter.handleInt(
       ~labels={operation},
       ~value=timerRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis,
     )
     counter->SafeCounter.increment(~labels={operation})
     whereSizeCounter->SafeCounter.handleInt(~labels={operation}, ~value=whereSize)
     sizeCounter->SafeCounter.handleInt(~labels={operation}, ~value=size)
   }
+
+  /* Decrement pendingCount and record attempt duration on failures without
+     incrementing success counters or size. Prevents pendingCount leaks. */
+  let failOperation = (timerRef, ~operation, ~whereSize) => {
+    let operationRef = operations->Js.Dict.unsafeGet(operation)
+    operationRef.pendingCount = operationRef.pendingCount - 1
+    if operationRef.pendingCount === 0 {
+      timeCounter->SafeCounter.handleInt(
+        ~labels={operation},
+        ~value=operationRef.timerRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis,
+      )
+      operations->Utils.Dict.deleteInPlace(operation)
+    }
+    totalTimeCounter->SafeCounter.handleInt(
+      ~labels={operation},
+      ~value=timerRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis,
+    )
+    whereSizeCounter->SafeCounter.handleInt(~labels={operation}, ~value=whereSize)
+  }
 }Then, call failOperation inside the catch branches in this file before re-raising: ReScript (illustrative change; outside the changed lines) // loadById catch (Lines 26-29 block)
} catch {
| Persistence.StorageError({message, reason}) => {
    Prometheus.StorageLoad.failOperation(
      timerRef,
      ~operation=key,
      ~whereSize=idsToLoad->Array.length,
    )
    reason->ErrorHandling.mkLogAndRaise(~logger=eventItem->Logging.getEventLogger, ~msg=message)
}
}
// loadByField catch (Lines 202-215 block inside the async index => { ... })
} catch {
| Persistence.StorageError({message, reason}) => {
    Prometheus.StorageLoad.failOperation(
      timerRef,
      ~operation=key,
      ~whereSize=fieldValues->Array.length,
    )
    reason->ErrorHandling.mkLogAndRaise(
      ~logger=Logging.createChildFrom(
        ~logger=eventItem->Logging.getEventLogger,
        ~params={
          "operator": operatorCallName,
          "tableName": entityConfig.table.tableName,
          "fieldName": fieldName,
          "fieldValue": fieldValue,
        },
      ),
      ~msg=message,
    )
}
}If you prefer not to expand the Prometheus API, the alternative is wrapping the whole load body in try/finally and calling endOperation in finally with size=0 on failures, but that would change the semantic of “successful operations” counter. Also applies to: 47-51, 169-172, 226-226, 230-234 🤖 Prompt for AI Agents | ||
|  | ||
| // Since LoadManager.call prevents registerign entities already existing in the inMemoryStore, | ||
| // we can be sure that we load only the new ones. | ||
| let dbEntities = try { | ||
|  | @@ -41,6 +43,12 @@ let loadById = ( | |
| ~entity=entitiesMap->Utils.Dict.dangerouslyGetNonOption(entityId), | ||
| ) | ||
| }) | ||
|  | ||
| timerRef->Prometheus.StorageLoad.endOperation( | ||
| ~operation=key, | ||
| ~whereSize=idsToLoad->Array.length, | ||
| ~size=dbEntities->Array.length, | ||
| ) | ||
| } | ||
|  | ||
| loadManager->LoadManager.call( | ||
|  | @@ -158,6 +166,10 @@ let loadByField = ( | |
| let inMemTable = inMemoryStore->InMemoryStore.getInMemTable(~entityConfig) | ||
|  | ||
| let load = async (fieldValues: array<'fieldValue>) => { | ||
| let timerRef = Prometheus.StorageLoad.startOperation(~operation=key) | ||
|  | ||
| let size = ref(0) | ||
|  | ||
| let indiciesToLoad = fieldValues->Js.Array2.map((fieldValue): TableIndices.Index.t => { | ||
| Single({ | ||
| fieldName, | ||
|  | @@ -210,8 +222,16 @@ let loadByField = ( | |
| ~entity=Some(entity), | ||
| ) | ||
| }) | ||
|  | ||
| size := size.contents + entities->Array.length | ||
| }) | ||
| ->Promise.all | ||
|  | ||
| timerRef->Prometheus.StorageLoad.endOperation( | ||
| ~operation=key, | ||
| ~whereSize=fieldValues->Array.length, | ||
| ~size=size.contents, | ||
| ) | ||
| } | ||
|  | ||
| loadManager->LoadManager.call( | ||
|  | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this represent cumulative time? It doesn't look like it builds on a previous time. Not sure I fully understand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We always increment the counter with the amount of time every query spent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sorry, I think I was confused. This is a counter and not a guage 👍🏼 got it