Skip to content

Conversation

@DZakh
Copy link
Member

@DZakh DZakh commented Sep 22, 2025

Update batch creation logic to include full block events in a batch, even if they exceed the batch size.

  • Also, refactored the buffer logic - made it in asc order and stopped using .pop to extract items. This should improve performance for indexers with more than 100k events per second.
  • Removed _progress_log_index column from envio_chains since it's not needed
  • Still need processingFilters for rollback

Summary by CodeRabbit

  • New Features
    • Configurable batch size via config and BATCH_SIZE env var; propagated through batch processing.
    • Batch responses now include creation timestamp.
  • Performance
    • More efficient, per-chain batching and readiness checks for ordered/unordered processing.
  • Breaking Changes
    • Environment variable renamed from MAX_BATCH_SIZE (defaulted) to optional BATCH_SIZE.
    • Public batch output field renamed (fetch states now returned as updated states).
  • Database
    • Removed obsolete progress log index column from chains schema.
  • Refactor
    • Internal queue renamed to buffer across fetch state handling.
  • Tests
    • Updated test suites to new batching and buffer semantics.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 22, 2025

Walkthrough

Refactors batching and fetch-state internals: queue → buffer model, new readiness helpers, per-chain batch sizing, and target-based batch preparation. Removes progress log index from schema and resume paths. Introduces config-driven batchSize, propagates through ChainManager/GlobalState/ChainFetcher, updates Batch API/shape, utilities, and tests accordingly.

Changes

Cohort / File(s) Summary
Batching API and flow
codegenerator/cli/npm/envio/src/Batch.res
Replaces item ordering with chain selection using per-chain counters; adds readiness helpers; renames/rewrites pop* to prepare* with batchSizeTarget and mutBatchSizePerChain; adds creationTimeMs; updates public fields (progressedChain, t).
Fetch state: queue → buffer
codegenerator/cli/npm/envio/src/FetchState.res
Converts queue to buffer (earliest→latest ordering), removes option dcsToStore, adds buffer-centric helpers (getTimestampAt, hasReadyItem, getReadyItemsCount); updates all logic and signatures to buffer/dcsToStore: [].
Chain manager and global state
codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res, codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res
Makes target buffer sizing config-aware; switches to batchSizeTarget; adopts new Batch.prepare* APIs; propagates updatedFetchStates and creationTimeMs; drops maxBatchSize from global state; updates metrics wiring.
Chain fetcher
codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res
Removes processingFilters parameter/usage; adjusts makeFromDbState to compute progressBlockNumber; returns processingFilters=None.
Config and env wiring
codegenerator/cli/templates/static/codegen/src/Config.res, codegenerator/cli/templates/static/codegen/src/Env.res, codegenerator/cli/templates/dynamic/codegen/src/RegisterHandlers.res.hbs
Adds Config.t.batchSize and plumbs via Config.make; replaces Env MAX_BATCH_SIZE with optional batchSize; passes ~batchSize from Env to Config.make.
Persistence and schema
codegenerator/cli/npm/envio/src/Persistence.res, codegenerator/cli/npm/envio/src/db/InternalTable.res
Removes progress log index from schema/types/queries and cleanup; resume checkpoints now use progressBlockNumber directly.
Utilities
codegenerator/cli/npm/envio/src/Utils.res
Adds Dict.dangerouslyGetByIntNonOption and Array.clearInPlace.
Tests update
scenarios/test_codegen/test/ChainManager_test.res, .../lib_tests/FetchState_test.res, .../lib_tests/PgStorage_test.res, .../lib_tests/SourceManager_test.res, .../lib_tests/FetchState_onBlock_test.res
Rename maxBatchSize→batchSizeTarget; propagate updatedFetchStates; migrate queue→buffer and dcsToStore None→[]; drop getOrderedNextItem suite; update SQL expectations removing _progress_log_index.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant App as GlobalState
  participant CM as ChainManager
  participant CF as ChainFetcher (per chain)
  participant B as Batch
  participant FS as FetchState (buffer)

  App->>CM: createBatch(~batchSizeTarget=config.batchSize)
  CM->>CF: getFetchStates()
  CF-->>CM: ChainMap<FetchState>
  CM->>B: prepareOrderedBatch/prepareUnorderedBatch(~batchSizeTarget, ~fetchStates, ~mutBatchSizePerChain)
  B->>FS: hasOrderedReadyItem/hasUnorderedReadyItem
  B->>FS: getTimestampAt / getReadyItemsCount
  FS-->>B: readiness and counts
  B-->>CM: {items, progressedChains, updatedFetchStates, dcsToStoreByChainId, creationTimeMs}
  CM-->>App: Batch result
  App->>App: dispatch UpdateQueues(updatedFetchStates)
  App->>Metrics: set ProcessingMaxBatchSize=config.batchSize
Loading
sequenceDiagram
  autonumber
  participant DB as InternalTable.Chains
  participant Pers as Persistence
  note over DB,Pers: Progress tracking (schema/resume)
  DB-x DB: Remove _progress_log_index field
  Pers->>Pers: checkpoints := progressBlockNumber
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • JonoPrest
  • JasoonS

Poem

I buffered my carrots in earliest-to-late,
Hopped through the chains, set a tidy batch rate.
No logs to index, just blocks to align,
With timestamps and targets, the queues now refine.
Tick-tock, creationTime—bun’s on time! 🥕⏱️

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title clearly summarizes the primary change of ensuring full block events are batched even if they exceed the batch size, which matches the objective to update batch creation logic. It is concise, descriptive, and focuses on the main feature without extraneous details. A teammate scanning the history will immediately understand the key impact of the PR.
Docstring Coverage ✅ Passed No functions found in the changes. Docstring coverage check skipped.
✨ Finishing touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch dz/full-block-in-batch

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 788ccae and b5adcec.

📒 Files selected for processing (2)
  • codegenerator/cli/templates/dynamic/codegen/src/RegisterHandlers.res.hbs (1 hunks)
  • codegenerator/cli/templates/static/codegen/src/Env.res (1 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
codegenerator/cli/templates/{static,dynamic}/codegen/src/**/*

📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)

Edit template versions under codegenerator/cli/templates/static/codegen/src or codegenerator/cli/templates/dynamic/codegen/src instead of editing generated/src

Files:

  • codegenerator/cli/templates/dynamic/codegen/src/RegisterHandlers.res.hbs
  • codegenerator/cli/templates/static/codegen/src/Env.res
**/*.{res,resi}

📄 CodeRabbit inference engine (.cursor/rules/rescript.mdc)

**/*.{res,resi}: Never use [| item |] to create an array. Use [ item ] instead.
Must always use = for setting value to a field. Use := only for ref values created using ref function.
ReScript has record types which require a type definition before hand. You can access record fields by dot like foo.myField.
It's also possible to define an inline object, it'll have quoted fields in this case.
Use records when working with structured data, and objects to conveniently pass payload data between functions.
Never use %raw to access object fields if you know the type.

Files:

  • codegenerator/cli/templates/static/codegen/src/Env.res
**/*.res

📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)

Prefer reading ReScript .res modules directly; compiled .js artifacts can be ignored

Files:

  • codegenerator/cli/templates/static/codegen/src/Env.res
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build_and_test
🔇 Additional comments (2)
codegenerator/cli/templates/static/codegen/src/Env.res (1)

15-15: Ensure generated template handles optional batchSize correctly
The change to MAX_BATCH_SIZE parsing uses S.option(S.int), so batchSize may now be None. Since this is a static template, generate a new codegen project and confirm that Config.t or its consumers unwrap the option or apply a sensible default when batchSize is None.

codegenerator/cli/templates/dynamic/codegen/src/RegisterHandlers.res.hbs (1)

74-74: Drop Env.batchSize verification in codegen templates. The optional ~batchSize=?Env.batchSize correctly targets the generated project’s Env module, which the user must update with batchSize.

Likely an incorrect or invalid review comment.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🧪 Early access (Sonnet 4.5): enabled

We are currently testing the Sonnet 4.5 model, which is expected to improve code review quality. However, this model may lead to increased noise levels in the review comments. Please disable the early access features if the noise level causes any inconvenience.

Note:

  • Public repositories are always opted into early access features.
  • You can enable or disable early access features from the CodeRabbit UI or by updating the CodeRabbit configuration file.

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (6)
scenarios/test_codegen/test/ChainManager_test.res (1)

157-157: Outdated arg name: createBatch now expects ~batchSizeTarget

ChainManager.createBatch was migrated to ~batchSizeTarget. Update this call site, otherwise the test won’t compile.

Apply:

-          let eventsInBlock = ChainManager.createBatch(chainManager, ~maxBatchSize=10000)
+          let eventsInBlock = ChainManager.createBatch(chainManager, ~batchSizeTarget=10000)
scenarios/test_codegen/test/lib_tests/FetchState_test.res (1)

2385-2388: Rename arg: filterAndSortForUnorderedBatch now takes ~batchSizeTarget

Tests still call ~maxBatchSize, which will fail after the API change.

Apply:

-      let prepared = FetchState.filterAndSortForUnorderedBatch(
-        [fsLate, fsExcluded, fsEarly],
-        ~maxBatchSize=3,
-      )
+      let prepared = FetchState.filterAndSortForUnorderedBatch(
+        [fsLate, fsExcluded, fsEarly],
+        ~batchSizeTarget=3,
+      )
-    let prepared = FetchState.filterAndSortForUnorderedBatch(
-      [fsHalfEarlier, fsFullLater],
-      ~maxBatchSize=2,
-    )
+    let prepared = FetchState.filterAndSortForUnorderedBatch(
+      [fsHalfEarlier, fsFullLater],
+      ~batchSizeTarget=2,
+    )
-    let prepared = FetchState.filterAndSortForUnorderedBatch(
-      [fsHalfEarlier, fsExactFull],
-      ~maxBatchSize=2,
-    )
+    let prepared = FetchState.filterAndSortForUnorderedBatch(
+      [fsHalfEarlier, fsExactFull],
+      ~batchSizeTarget=2,
+    )

Also applies to: 2429-2433, 2472-2475

codegenerator/cli/npm/envio/src/FetchState.res (1)

1305-1327: Replace all remaining ~maxBatchSize invocations with ~batchSizeTarget

  • scenarios/test_codegen/test/ChainManager_test.res (e.g. line 157) and scenarios/test_codegen/test/lib_tests/FetchState_test.res (lines 2387, 2431, 2474)
  • codegenerator/cli/npm/envio/src/Prometheus.res (line 552)
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (line 53)
  • internal_docs/EventFetchers.md signature (if kept in sync)
codegenerator/cli/npm/envio/src/Batch.res (2)

74-96: Batch cap truncates a block; overflow to finish the current block

With a strict items.length < batchSizeTarget guard, the final block can be split across batches, contradicting “Always include all block events to a batch.” Allow controlled overflow to drain the current block (same chainId + blockNumber) after hitting the target.

Apply:

 let popOrderedBatchItems = (
-  ~batchSizeTarget,
+  ~batchSizeTarget,
   ~fetchStates: ChainMap.t<FetchState.t>,
   ~sizePerChain: dict<int>,
 ) => {
   let items = []
 
-  let rec loop = () =>
-    if items->Array.length < batchSizeTarget {
-      switch fetchStates->getOrderedNextItem {
-      | Some({earliestEvent, chain}) =>
-        switch earliestEvent {
-        | NoItem(_) => ()
-        | Item({item, popItemOffQueue}) => {
-            popItemOffQueue()
-            items->Js.Array2.push(item)->ignore
-            sizePerChain->Utils.Dict.incrementByInt(chain->ChainMap.Chain.toChainId)
-            loop()
-          }
-        }
-      | _ => ()
-      }
-    }
-  loop()
+  let rec loop = (lastKey: option<(int, int)>) => {
+    switch fetchStates->getOrderedNextItem {
+    | Some({earliestEvent, chain}) =>
+      switch earliestEvent {
+      | NoItem(_) => ()
+      | Item({item, popItemOffQueue}) => {
+          // Derive block key from comparator: (ts, chainId, blockNumber, logIndex)
+          let (_, chainId, blockNumber, _) =
+            earliestEvent->getQueueItemComparitor(~chain)
+          let curr = items->Array.length
+          let canTake =
+            curr < batchSizeTarget ||
+            switch lastKey {
+            | Some((lastChainId, lastBlock)) => lastChainId === chainId && lastBlock === blockNumber
+            | None => true
+            }
+          if canTake {
+            popItemOffQueue()
+            items->Js.Array2.push(item)->ignore
+            sizePerChain->Utils.Dict.incrementByInt(chainId)
+            loop(Some((chainId, blockNumber)))
+          } else {
+            ()
+          }
+        }
+      }
+    | _ => ()
+    }
+  }
+  loop(None)
 
   items
 }

101-149: Unordered path can still split a block at the boundary

Same issue as ordered: once batchSizeTarget is reached, inner per‑chain loop stops mid‑block. To satisfy the PR goal, continue draining the current block for that chain even if it overflows the target. Suggest extracting a helper that returns a (chainId, blockNumber) key from a queueItem, then gate the inner loop with “batchSize < target OR sameBlockAsLastPoppedForThisChain”.

I can draft a small helper signature and refactor if you confirm where to obtain blockNumber from queueItem in this module (EventUtils or FetchState).

codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (1)

115-131: Rename remaining call sites from ~maxBatchSize → ~batchSizeTarget

One test and the docs still use the old param name; update them (GlobalState already uses ~batchSizeTarget at codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res:902).

  • scenarios/test_codegen/test/ChainManager_test.res:157 — change ChainManager.createBatch(chainManager, ~maxBatchSize=10000) to ChainManager.createBatch(chainManager, ~batchSizeTarget=10000).
  • internal_docs/EventFetchers.md:15 — update the createBatch signature/docs (remove or rename min/maxBatchSize to reflect the new single ~batchSizeTarget param).
🧹 Nitpick comments (12)
codegenerator/cli/npm/envio/src/Utils.res (1)

191-194: Use of %raw to mutate array length — acceptable workaround, but document intent and risks.

Directly setting arr.length via %raw bypasses type checks and our guideline to avoid %raw when the type is known. Given the spliceInPlace ignore-return bug noted in this file, this is pragmatic for tests; please add a brief ocaml.doc comment referencing the compiler issue and clarifying this is intentional until the bug is resolved. Consider renaming to clearInPlaceUnsafe to signal the escape hatch nature.

scenarios/test_codegen/test/helpers/Mock.res (2)

505-528: Wrapper around optional handler: small helper would reduce duplication and clarify preload behavior.

The inline switch creates an Internal.handler or None and guards on context.isPreload. Extract to a tiny helper to keep tests concise and make preload behavior explicit.

-                            handler: switch item.handler {
-                            | Some(handler) =>
-                              (
-                                ({context} as args) => {
-                                  // We don't want preload optimization for the tests
-                                  if context.isPreload {
-                                    Promise.resolve()
-                                  } else {
-                                    handler(args)
-                                  }
-                                }
-                              )->(
-                                Utils.magic: Types.HandlerTypes.loader<unit, unit> => option<
-                                  Internal.handler,
-                                >
-                              )
-                            | None => None
-                            },
+                            handler: (
+                              switch item.handler {
+                              | Some(h) =>
+                                (({context} as args) =>
+                                  context.isPreload ? Promise.resolve() : h(args)
+                                )
+                                ->(Utils.magic: Types.HandlerTypes.loader<unit, unit> => option<Internal.handler>)
+                              | None => None
+                              }
+                            ),

524-528: contractRegister cast: annotate intent.

The magic cast from option<contractRegister> to option<Internal.contractRegister> is standard for these tests; add a short comment that this is a test-only conversion to avoid %raw/object poking.

scenarios/test_codegen/test/rollback/Rollback_test.res (2)

947-949: Clearing recorded calls mid-test: good use of clearInPlace; add a tiny helper to avoid repetition.

You clear getItemsOrThrowCalls in-place to isolate the next phase. Consider a small local helper (e.g., let resetCalls = arr => arr->Utils.Array.clearInPlace) reused across tests to keep intent obvious.


960-970: Multiple microtask flushes: consider a helper.

Repeated await Utils.delay(0) calls are a bit noisy. A local flushMicrotasks helper that awaits Promise.resolve() twice keeps tests terse and intention-revealing.

scenarios/test_codegen/test/ChainManager_test.res (3)

195-196: Fix test assertion strength: advance lastEvent across batches

The recursive call reuses the initial lastEvent, so the ordering assertion doesn’t evolve. Use the last item of the current batch.

Apply:

-            testThatCreatedEventsAreOrderedCorrectly(nextChainManager, lastEvent)
+            let newLastEvent = items->Utils.Array.last->Option.getExn
+            testThatCreatedEventsAreOrderedCorrectly(nextChainManager, newLastEvent)

51-53: Avoid potential infinite loop in random time generator

blockTime can be 0, risking non‑progress in the while loop. Force a minimum of 1.

Apply:

-      let blockTime = getRandomInt(0, 2 * averageBlockTime)
+      let blockTime = getRandomInt(1, 2 * averageBlockTime)

2119-2119: Remove stray console log in tests

Js.log(updatedFetchState) adds noise to CI output.

Apply:

-    Js.log(updatedFetchState)
+    /* removed noisy log */
codegenerator/cli/templates/static/codegen/src/Config.res (1)

130-131: Guard batchSize against misconfiguration (0 or negative)

A 0 batchSize can cause downstream indexing/array access issues. Clamp to ≥1.

Apply:

 ) => {
-  let chainMap =
+  let batchSize = Pervasives.max(batchSize, 1)
+  let chainMap =
scenarios/test_codegen/test/lib_tests/FetchState_test.res (1)

2119-2119: Remove stray console log

Drop Js.log(updatedFetchState) to keep tests quiet.

Apply:

-    Js.log(updatedFetchState)
+    /* removed noisy log */
codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res (1)

910-922: Minor: compute threshold only when updates exist

You compute shouldEnterReorgThreshold unconditionally; optional micro‑opt is to only evaluate when any fetch state actually changed. Not blocking.

codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (1)

9-16: Target buffer formula: double‑check intent under many chains

The formula drops to 1×batchSize if activeChainsCount > 3. Validate this is intentional (memory cap) and won’t starve prefetch when e.g. 10 chains are active.

If the intent is “min(3, activeChainsCount) batches worth of buffer”, consider:

-    config.batchSize * (activeChainsCount > targetBatchesInBuffer ? 1 : targetBatchesInBuffer)
+    config.batchSize * Belt.Int.min(activeChainsCount, targetBatchesInBuffer)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8be862b and cfd5531.

📒 Files selected for processing (16)
  • codegenerator/cli/npm/envio/src/Batch.res (5 hunks)
  • codegenerator/cli/npm/envio/src/FetchState.res (5 hunks)
  • codegenerator/cli/npm/envio/src/PgStorage.res (0 hunks)
  • codegenerator/cli/npm/envio/src/Utils.res (1 hunks)
  • codegenerator/cli/npm/envio/src/db/InternalTable.res (0 hunks)
  • codegenerator/cli/templates/static/codegen/src/Config.res (3 hunks)
  • codegenerator/cli/templates/static/codegen/src/Env.res (1 hunks)
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (6 hunks)
  • codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res (2 hunks)
  • scenarios/erc20_multichain_factory/test/DynamicContractRecovery_test.res (0 hunks)
  • scenarios/test_codegen/test/ChainManager_test.res (3 hunks)
  • scenarios/test_codegen/test/helpers/Mock.res (2 hunks)
  • scenarios/test_codegen/test/lib_tests/FetchState_test.res (14 hunks)
  • scenarios/test_codegen/test/lib_tests/PgStorage_test.res (0 hunks)
  • scenarios/test_codegen/test/lib_tests/SourceManager_test.res (1 hunks)
  • scenarios/test_codegen/test/rollback/Rollback_test.res (1 hunks)
💤 Files with no reviewable changes (4)
  • codegenerator/cli/npm/envio/src/db/InternalTable.res
  • scenarios/test_codegen/test/lib_tests/PgStorage_test.res
  • codegenerator/cli/npm/envio/src/PgStorage.res
  • scenarios/erc20_multichain_factory/test/DynamicContractRecovery_test.res
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{res,resi}

📄 CodeRabbit inference engine (.cursor/rules/rescript.mdc)

**/*.{res,resi}: Never use [| item |] to create an array. Use [ item ] instead.
Must always use = for setting value to a field. Use := only for ref values created using ref function.
ReScript has record types which require a type definition before hand. You can access record fields by dot like foo.myField.
It's also possible to define an inline object, it'll have quoted fields in this case.
Use records when working with structured data, and objects to conveniently pass payload data between functions.
Never use %raw to access object fields if you know the type.

Files:

  • scenarios/test_codegen/test/lib_tests/SourceManager_test.res
  • codegenerator/cli/templates/static/codegen/src/Env.res
  • codegenerator/cli/templates/static/codegen/src/Config.res
  • codegenerator/cli/npm/envio/src/Utils.res
  • scenarios/test_codegen/test/rollback/Rollback_test.res
  • codegenerator/cli/npm/envio/src/Batch.res
  • scenarios/test_codegen/test/helpers/Mock.res
  • codegenerator/cli/npm/envio/src/FetchState.res
  • scenarios/test_codegen/test/ChainManager_test.res
  • scenarios/test_codegen/test/lib_tests/FetchState_test.res
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
  • codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res
**/*.res

📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)

Prefer reading ReScript .res modules directly; compiled .js artifacts can be ignored

Files:

  • scenarios/test_codegen/test/lib_tests/SourceManager_test.res
  • codegenerator/cli/templates/static/codegen/src/Env.res
  • codegenerator/cli/templates/static/codegen/src/Config.res
  • codegenerator/cli/npm/envio/src/Utils.res
  • scenarios/test_codegen/test/rollback/Rollback_test.res
  • codegenerator/cli/npm/envio/src/Batch.res
  • scenarios/test_codegen/test/helpers/Mock.res
  • codegenerator/cli/npm/envio/src/FetchState.res
  • scenarios/test_codegen/test/ChainManager_test.res
  • scenarios/test_codegen/test/lib_tests/FetchState_test.res
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
  • codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res
codegenerator/cli/templates/{static,dynamic}/codegen/src/**/*

📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)

Edit template versions under codegenerator/cli/templates/static/codegen/src or codegenerator/cli/templates/dynamic/codegen/src instead of editing generated/src

Files:

  • codegenerator/cli/templates/static/codegen/src/Env.res
  • codegenerator/cli/templates/static/codegen/src/Config.res
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
  • codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build_and_test
🔇 Additional comments (18)
scenarios/test_codegen/test/helpers/Mock.res (1)

357-362: itemMock optional handler/contractRegister: type change is sensible for flexibility.

Switching to optional fields is aligned with the tests and avoids dummy no‑ops. No functional concerns.

scenarios/test_codegen/test/rollback/Rollback_test.res (1)

777-1027: Strong scenario coverage for dynamic contract rollback.

The sequence assertions (pre/post rollback, registry state, and resumed fetching from 105) look solid and align with “include all block events to a batch.”

scenarios/test_codegen/test/lib_tests/SourceManager_test.res (1)

197-199: dcsToStore default to []: OK.

Matches the refactor to non-optional dcsToStore and keeps updateInternal stable.

codegenerator/cli/templates/static/codegen/src/Env.res (1)

15-18: I’m still not seeing any direct references to Env.batchSize in call sites. It appears the batchSize value defined in Env.res is only exposed via the Env module but isn’t automatically wired into the application’s configuration—callers must explicitly pass it to Config.make or similar. Because we can’t confirm any downstream usage of Env.batchSize, please verify manually that wherever the application’s top-level config is constructed (e.g., in cli/src/executor/dev.rs, cli/src/executor/codegen.rs, etc.), Env.batchSize is indeed passed into Config.make(~batchSize=…) and that no hard-coded defaults (like the literal 5000) remain. Ensure those call sites handle the None case by supplying a fallback.

scenarios/test_codegen/test/ChainManager_test.res (3)

162-162: LGTM: updatedFetchStates usage

Good switch to {items, updatedFetchStates}. This aligns with the new batch API.


183-187: LGTM: threading updated fetch states

Mapping updatedFetchStates back into chainFetchers looks correct.


279-279: LGTM: dcsToStore default is now []

Tests reflect the non‑optional list type.

codegenerator/cli/templates/static/codegen/src/Config.res (1)

117-118: LGTM: batchSize added to Config.t

Field addition is consistent with downstream usage.

scenarios/test_codegen/test/lib_tests/FetchState_test.res (1)

165-165: LGTM: migrate dcsToStore from option to array across tests

All expectations updated to [] or concrete arrays match the new FetchState.t shape.

Also applies to: 238-238, 304-304, 402-402, 708-708, 758-758, 878-878, 919-919, 942-942, 1226-1226, 1662-1662, 1702-1702, 1783-1783, 1870-1870

codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res (2)

902-905: LGTM: createBatch uses config.batchSize and updatedFetchStates

Correctly passes batchSizeTarget and extracts updatedFetchStates.


937-941: LGTM: queue updates with updatedFetchStates + threshold gating

Dispatching UpdateQueues({updatedFetchStates, shouldEnterReorgThreshold}) matches the new flow.

codegenerator/cli/npm/envio/src/FetchState.res (3)

63-65: LGTM: dcsToStore is now a concrete array

Type and comment updated appropriately.


593-595: LGTM: concat semantics for dcsToStore

Appending newly registered DCs when preexisting ones exist is correct.


1244-1247: LGTM: rollback filters dcsToStore by removed addresses

Good alignment with the new list representation.

codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (3)

49-55: Metrics wiring looks correct

Prometheus.ProcessingMaxBatchSize now reflects config.batchSize; IndexingTargetBufferSize uses the computed target. Good.


183-183: Correct source for progressed state

Using updatedFetchStates here is correct given earlier refactor.


242-247: Return shape matches the new public Batch.t

Return includes updatedFetchStates and dcsToStoreByChainId. LGTM.

codegenerator/cli/npm/envio/src/Batch.res (1)

12-14: Public field rename: updatedFetchStates — verified

Searched the repo; no remaining destructuring or property-access of the old batch field "fetchStates" was found. Templates and tests use updatedFetchStates (e.g. codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res:904,937; scenarios/test_codegen/test/ChainManager_test.res:162). Local parameter names "fetchStates" in Batch.res are unchanged and expected.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (5)
codegenerator/cli/npm/envio/src/Utils.res (1)

197-200: Prefer spliceInPlace over raw length mutation

Avoiding %raw keeps the module consistent with existing array externals and reduces JS interop surface.

-  let clearInPlace: array<'a> => unit = %raw(`(arr) => {
-    arr.length = 0
-  }`)
+  let clearInPlace = (arr: array<'a>): unit => {
+    // Consistent with module's @send spliceInPlace external
+    arr->spliceInPlace(~pos=0, ~remove=arr->Array.length)->ignore
+  }
codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res (1)

926-941: Avoid progress-only commits during rollback risk (FIXME)

The FIXME is valid: advancing progress with empty items can mask needed rollbacks after restart.

Consider skipping DB progress writes for the “no items” branch when rollbackState is RollbackInMemStore, or gate behind a flag.

-        await Db.sql->InternalTable.Chains.setProgressedChains(
-          ~pgSchema=Db.publicSchema,
-          ~progressedChains,
-        )
+        switch state.rollbackState {
+        | RollbackInMemStore(_) => ()
+        | _ =>
+          await Db.sql->InternalTable.Chains.setProgressedChains(
+            ~pgSchema=Db.publicSchema,
+            ~progressedChains,
+          )
+        }

I can open a follow-up task and add a test case if you want.

scenarios/test_codegen/test/lib_tests/FetchState_test.res (1)

2135-2135: Remove stray debug log

Js.log(updatedFetchState) adds noise to test output.

-    Js.log(updatedFetchState)
+    // No-op
codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (1)

131-134: Use int-key accessor consistently

Prefer dangerouslyGetByIntNonOption(fetchState.chainId) over stringifying the key.

-    switch batchSizePerChain->Utils.Dict.dangerouslyGetNonOption(fetchState.chainId->Int.toString) {
+    switch batchSizePerChain->Utils.Dict.dangerouslyGetByIntNonOption(fetchState.chainId) {
codegenerator/cli/npm/envio/src/Batch.res (1)

20-49: Ordered chain selection — tighten tiebreaker

Selection prefers smaller timestamp; ties break by larger chainId due to the guard. Consider explicit stable tiebreak (e.g., prefer lower chainId) to avoid dependency on key iteration order.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cfd5531 and 62bbd7d.

📒 Files selected for processing (7)
  • codegenerator/cli/npm/envio/src/Batch.res (1 hunks)
  • codegenerator/cli/npm/envio/src/FetchState.res (6 hunks)
  • codegenerator/cli/npm/envio/src/Utils.res (2 hunks)
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (4 hunks)
  • codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res (2 hunks)
  • scenarios/test_codegen/test/ChainManager_test.res (2 hunks)
  • scenarios/test_codegen/test/lib_tests/FetchState_test.res (25 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{res,resi}

📄 CodeRabbit inference engine (.cursor/rules/rescript.mdc)

**/*.{res,resi}: Never use [| item |] to create an array. Use [ item ] instead.
Must always use = for setting value to a field. Use := only for ref values created using ref function.
ReScript has record types which require a type definition before hand. You can access record fields by dot like foo.myField.
It's also possible to define an inline object, it'll have quoted fields in this case.
Use records when working with structured data, and objects to conveniently pass payload data between functions.
Never use %raw to access object fields if you know the type.

Files:

  • scenarios/test_codegen/test/ChainManager_test.res
  • codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res
  • codegenerator/cli/npm/envio/src/Batch.res
  • codegenerator/cli/npm/envio/src/FetchState.res
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
  • codegenerator/cli/npm/envio/src/Utils.res
  • scenarios/test_codegen/test/lib_tests/FetchState_test.res
**/*.res

📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)

Prefer reading ReScript .res modules directly; compiled .js artifacts can be ignored

Files:

  • scenarios/test_codegen/test/ChainManager_test.res
  • codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res
  • codegenerator/cli/npm/envio/src/Batch.res
  • codegenerator/cli/npm/envio/src/FetchState.res
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
  • codegenerator/cli/npm/envio/src/Utils.res
  • scenarios/test_codegen/test/lib_tests/FetchState_test.res
codegenerator/cli/templates/{static,dynamic}/codegen/src/**/*

📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)

Edit template versions under codegenerator/cli/templates/static/codegen/src or codegenerator/cli/templates/dynamic/codegen/src instead of editing generated/src

Files:

  • codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build_and_test
🔇 Additional comments (19)
scenarios/test_codegen/test/ChainManager_test.res (2)

157-163: Tests aligned with new API (batchSizeTarget, updatedFetchStates) — LGTM

Calls and pattern matches updated correctly to ~batchSizeTarget and {items, updatedFetchStates}.


181-189: Correct propagation of updated fetch states

Mapping updatedFetchStates back into chainFetchers is accurate.

codegenerator/cli/npm/envio/src/Utils.res (1)

73-79: Add integer-key accessor — LGTM

Dict.dangerouslyGetByIntNonOption complements setByInt/incrementByInt and keeps call-sites clean where int keys are used.

Ensure all reads of int-keyed dicts use this accessor (e.g., places currently using dangerouslyGetNonOption(Int.toString ...)) for consistency.

codegenerator/cli/npm/envio/src/FetchState.res (7)

935-945: getTimestampAt: ordered-mode guard — LGTM

Throwing for Block in ordered multichain prevents undefined timestamp ordering.


954-978: Ready-count preserves block atomicity — LGTM

getReadyItemsCount continues within the current block after hitting targetSize. This avoids splitting blocks across batches.

Please confirm there’s a unit test asserting no block is split when targetSize cuts through a block.


980-982: Unsafe access is fine with prior guard; keep usage constrained

getUnsafeItemAt assumes a prior readiness check. Keep it internal to code paths that already validated count to avoid accidental misuse.


1120-1121: Initialize dcsToStore as [] — LGTM

Simplifies logic vs option type and removes Some/None branching elsewhere.


1231-1234: Rollback dcsToStore filter — LGTM

Correctly prunes removed addresses from dcsToStore.


1269-1283: Guard against batchSizeTarget=0 and avoid OOB unsafe_get

The current logic OOBs when batchSizeTarget=0. Handle non-positive targets and switch to safe Array.get.

-  let hasFullBatch = ({queue} as fetchState: t, ~batchSizeTarget) => {
-    // Queue is ordered from latest to earliest, so the earliest eligible
-    // item for a full batch of size B is at index (length - B).
-    // Do NOT subtract an extra 1 here; when length === B we should still
-    // classify the queue as full and probe index 0.
-    let targetBlockIdx = queue->Array.length - batchSizeTarget
-    if targetBlockIdx < 0 {
-      false
-    } else {
-      // Unsafe can fail when batchSizeTarget is 0,
-      // but we ignore the case
-      queue->Js.Array2.unsafe_get(targetBlockIdx)->Internal.getItemBlockNumber <=
-        fetchState->bufferBlockNumber
-    }
-  }
+  let hasFullBatch = ({queue} as fetchState: t, ~batchSizeTarget) => {
+    if batchSizeTarget <= 0 {
+      false
+    } else {
+      let targetBlockIdx = queue->Array.length - batchSizeTarget
+      switch queue->Array.get(targetBlockIdx) {
+      | Some(item) => item->Internal.getItemBlockNumber <= fetchState->bufferBlockNumber
+      | None => false
+      }
+    }
+  }

1285-1306: Unordered prep uses readiness and stable ordering — LGTM

Filtering via hasReadyItem and sorting by earliest timestamp is appropriate for unordered mode.

codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res (2)

901-905: Config-driven batch size — LGTM

Switch to ~batchSizeTarget=state.config.batchSize aligns with PR goals.


942-954: Benchmark hooks for batch creation — LGTM

Recording creation time and size on empty/non-empty branches is helpful.

scenarios/test_codegen/test/lib_tests/FetchState_test.res (2)

8-29: Back-compat test shim — LGTM

oldQueueItem, getItem, and getEarliestEvent accurately reflect new FetchState API while preserving prior test semantics.


2401-2404: Add 0/negative batchSizeTarget tests

Edge-case coverage would catch the hasFullBatch guard. Add a test with ~batchSizeTarget=0 to ensure no OOB and false returned.

codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (2)

9-16: Verify buffer sizing semantics

config.batchSize * (activeChainsCount > 3 ? 1 : 3) reduces buffer when many chains are active. Confirm this is intended (1 batch total per chain vs 3 batches when <=3 chains).


131-166: Don’t mutate arrays passed into updateInternal (aliasing hazard)

leftDcsToStore is passed into updateInternal and then mutated. Compute batchDcs/leftDcs first, then call updateInternal with the finalized leftDcs.

-      | dcs => {
-          let leftDcsToStore = []
-          let batchDcs = []
-          let updatedFetchState =
-            fetchState->FetchState.updateInternal(~mutItems=leftItems, ~dcsToStore=leftDcsToStore)
-          let nextProgressBlockNumber = updatedFetchState->FetchState.getProgressBlockNumber
-          let nextProgressNextBlockLogIndex =
-            updatedFetchState->FetchState.getProgressNextBlockLogIndex
-
-          dcs->Array.forEach(dc => {
+      | dcs => {
+          // First advance state based on popped items
+          let tmp = fetchState->FetchState.updateInternal(~mutItems=leftItems)
+          let nextProgressBlockNumber = tmp->FetchState.getProgressBlockNumber
+          let nextProgressNextBlockLogIndex = tmp->FetchState.getProgressNextBlockLogIndex
+
+          let batchDcs = []
+          let leftDcs = []
+          dcs->Array.forEach(dc => {
             // Important: This should be a registering block number.
             // This works for now since dc.startBlock is a registering block number.
             if (
               dc.startBlock <= nextProgressBlockNumber ||
                 switch (nextProgressNextBlockLogIndex, dc.register) {
                 | (Some(nextProgressNextBlockLogIndex), DC(dcData)) =>
                   dc.startBlock === nextProgressBlockNumber + 1 &&
                     dcData.registeringEventLogIndex <= nextProgressNextBlockLogIndex
                 | _ => false
                 }
             ) {
-              batchDcs->Array.push(dc)
+              batchDcs->Array.push(dc)
             } else {
-              // Mutate the array we passed to the updateInternal beforehand
-              leftDcsToStore->Array.push(dc)
+              leftDcs->Array.push(dc)
             }
           })
 
-          dcsToStoreByChainId->Js.Dict.set(fetchState.chainId->Int.toString, batchDcs)
-          updatedFetchState
+          dcsToStoreByChainId->Js.Dict.set(fetchState.chainId->Int.toString, batchDcs)
+          // Now persist remaining DCs
+          tmp->FetchState.updateInternal(~dcsToStore=leftDcs)
         }
codegenerator/cli/npm/envio/src/Batch.res (3)

78-121: Ordered batching respects block atomicity — LGTM

getReadyItemsCount with fromItem preserves block boundaries while filling to batchSizeTarget.


123-161: Unordered batching via prefiltered/sorted states — LGTM

Accumulates per-chain items without splitting blocks; per-chain sizes recorded in mutBatchSizePerChain.


51-58: Ready-item checks for modes — LGTM

hasMultichainReadyItem correctly gates batch creation based on multichain mode.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
codegenerator/cli/npm/envio/src/FetchState.res (1)

1256-1285: Avoid nondeterministic sort: remove random comparator branch.

Using Js.Math.random_int inside sort comparator can yield unstable and pathological sorts. Prefer deterministic fallback.

-        // Use unsafe since we filtered out all queues without batch items
-        switch (a.buffer->Belt.Array.getUnsafe(0), b.buffer->Belt.Array.getUnsafe(0)) {
-        | (Event({timestamp: aTimestamp}), Event({timestamp: bTimestamp})) =>
-          aTimestamp - bTimestamp
-        | (Block(_), _)
-        | (_, Block(_)) =>
-          // Currently block items don't have a timestamp,
-          // so we sort chains with them in a random order
-          Js.Math.random_int(-1, 1)
-        }
+        // Deterministic ordering: prefer earliest blockNumber; tie-break by timestamp when available; then chainId
+        let a0 = a.buffer->Belt.Array.getUnsafe(0)
+        let b0 = b.buffer->Belt.Array.getUnsafe(0)
+        let bnDiff =
+          a0->Internal.getItemBlockNumber - b0->Internal.getItemBlockNumber
+        if bnDiff !== 0 {
+          bnDiff
+        } else {
+          switch (a0, b0) {
+          | (Event({timestamp: aTs}), Event({timestamp: bTs})) =>
+            let tsDiff = aTs - bTs
+            if tsDiff !== 0 { tsDiff } else { a.chainId - b.chainId }
+          | _ => a.chainId - b.chainId
+          }
+        }
🧹 Nitpick comments (5)
scenarios/test_codegen/test/lib_tests/FetchState_onBlock_test.res (2)

92-99: Fix outdated "reverse order" comments (buffer is earliest → latest).

The buffer stores items earliest-to-latest, but comments still say "reverse order (latest to earliest)". Update these to avoid confusion.

-    // Expected in reverse order (latest to earliest): block items have logIndex=16777216, event has logIndex=0
+    // Buffer order is earliest → latest: block items have logIndex=16777216, event has logIndex=0

Also applies to: 146-147, 194-195, 248-249, 300-301


93-97: Nit: rename local queue var to buffer for consistency.

You're reading from updatedFetchState.buffer but bind it to queue. Consider renaming to reduce cognitive load in future diffs.

Also applies to: 140-142, 188-191, 240-243, 295-298

scenarios/test_codegen/test/lib_tests/FetchState_test.res (2)

1946-1953: Correct the assertion message: ordering is ASC, not DESC.

Expected buffer is sorted by (blockNumber ASC, logIndex ASC). Update the message.

-      ~message="Queue must be sorted DESC by (blockNumber, logIndex) regardless of input order",
+      ~message="Buffer must be sorted ASC by (blockNumber, logIndex) regardless of input order",

2135-2135: Remove leftover debug log.

Js.log(updatedFetchState) is noisy in tests.

-    Js.log(updatedFetchState)
+    /* removed debug log */
codegenerator/cli/npm/envio/src/FetchState.res (1)

948-969: Guard getReadyItemsCount for non‑positive targetSize.

Without this, targetSize=0 can still consume items. Add an early return.

 let getReadyItemsCount = (fetchState: t, ~targetSize: int, ~fromItem) => {
+  if targetSize <= 0 {
+    0
+  } else {
   let readyBlockNumber = ref(fetchState->bufferBlockNumber)
   let acc = ref(0)
   let isFinished = ref(false)
   while !isFinished.contents {
     switch fetchState.buffer->Belt.Array.get(fromItem + acc.contents) {
     | Some(item) =>
       let itemBlockNumber = item->Internal.getItemBlockNumber
       if itemBlockNumber <= readyBlockNumber.contents {
         acc := acc.contents + 1
         if acc.contents === targetSize {
           // Should finish accumulating items from the same block
           readyBlockNumber := itemBlockNumber
         }
       } else {
         isFinished := true
       }
     | None => isFinished := true
     }
   }
   acc.contents
+  }
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 62bbd7d and 7a6ccad.

📒 Files selected for processing (6)
  • codegenerator/cli/npm/envio/src/Batch.res (1 hunks)
  • codegenerator/cli/npm/envio/src/FetchState.res (16 hunks)
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (4 hunks)
  • scenarios/test_codegen/test/lib_tests/FetchState_onBlock_test.res (6 hunks)
  • scenarios/test_codegen/test/lib_tests/FetchState_test.res (34 hunks)
  • scenarios/test_codegen/test/lib_tests/SourceManager_test.res (3 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{res,resi}

📄 CodeRabbit inference engine (.cursor/rules/rescript.mdc)

**/*.{res,resi}: Never use [| item |] to create an array. Use [ item ] instead.
Must always use = for setting value to a field. Use := only for ref values created using ref function.
ReScript has record types which require a type definition before hand. You can access record fields by dot like foo.myField.
It's also possible to define an inline object, it'll have quoted fields in this case.
Use records when working with structured data, and objects to conveniently pass payload data between functions.
Never use %raw to access object fields if you know the type.

Files:

  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
  • scenarios/test_codegen/test/lib_tests/FetchState_onBlock_test.res
  • scenarios/test_codegen/test/lib_tests/FetchState_test.res
  • scenarios/test_codegen/test/lib_tests/SourceManager_test.res
  • codegenerator/cli/npm/envio/src/FetchState.res
  • codegenerator/cli/npm/envio/src/Batch.res
**/*.res

📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)

Prefer reading ReScript .res modules directly; compiled .js artifacts can be ignored

Files:

  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
  • scenarios/test_codegen/test/lib_tests/FetchState_onBlock_test.res
  • scenarios/test_codegen/test/lib_tests/FetchState_test.res
  • scenarios/test_codegen/test/lib_tests/SourceManager_test.res
  • codegenerator/cli/npm/envio/src/FetchState.res
  • codegenerator/cli/npm/envio/src/Batch.res
codegenerator/cli/templates/{static,dynamic}/codegen/src/**/*

📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)

Edit template versions under codegenerator/cli/templates/static/codegen/src or codegenerator/cli/templates/dynamic/codegen/src instead of editing generated/src

Files:

  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build_and_test
🔇 Additional comments (15)
scenarios/test_codegen/test/lib_tests/SourceManager_test.res (2)

152-154: Initialize buffer and dcsToStore as arrays (LGTM).

Switch to ~buffer=[] and dcsToStore: [] aligns with the new API.

Also applies to: 189-198


691-699: Pre-seeding buffer for targetBufferSize tests (LGTM).

Using buffer seeds to simulate cap behavior matches the new buffering semantics.

scenarios/test_codegen/test/lib_tests/FetchState_test.res (4)

8-28: Backward-compatibility shim (LGTM).

oldQueueItem, getItem, and getEarliestEvent correctly adapt tests to buffer semantics and ready-count API.


175-181: Adopting buffer and dcsToStore: [] in state assertions (LGTM).

Assertions reflect the new concrete fields and ordering.

Also applies to: 246-254, 319-321, 410-418, 888-894


2042-2046: LGTM: earliest-event helper usage matches new buffer semantics.

The checks via getEarliestEvent->getItem are consistent and safe.

Also applies to: 2074-2076, 2110-2116, 2178-2180


2401-2405: LGTM: filterAndSortForUnorderedBatch tests target new batchSizeTarget.

Coverage is good for half/full batches and earliest timestamp prioritization.

Also applies to: 2443-2447, 2485-2489

codegenerator/cli/npm/envio/src/FetchState.res (2)

188-194: Comparator matches new buffer order (LGTM).

ASC by blockNumber, then logIndex, aligns with earliest→latest buffer semantics.


1118-1128: Rollback buffer pruning and dcs filtering (LGTM).

Pruning by (blockNumber, logIndex) and filtering dcsToStore against removed addresses are correct.

Also applies to: 1216-1222

codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (4)

9-16: Config‑aware targetBufferSize (LGTM).

Deriving targetBufferSize from config.batchSize and chains count looks sound; metrics wiring is updated accordingly.

Also applies to: 49-55


98-106: Expose fetchStates (LGTM).

getFetchStates accessor simplifies downstream batching logic.


108-122: Batching API updates (LGTM).

  • prepareOrdered/UnorderedBatch(~batchSizeTarget, ...) usage is consistent.
  • Returning updatedFetchStates and creationTimeMs aligns with the new contract.

Also applies to: 123-124, 130-171, 205-212


137-167: Don’t mutate arrays after passing them into updateInternal.

leftDcsToStore is passed to updateInternal and then mutated, which is brittle aliasing. Partition first, then pass the “left” slice.

-      | dcs => {
-          let leftDcsToStore = []
-          let batchDcs = []
-          let updatedFetchState =
-            fetchState->FetchState.updateInternal(~mutItems=leftItems, ~dcsToStore=leftDcsToStore)
-          let nextProgressBlockNumber = updatedFetchState->FetchState.getProgressBlockNumber
-          let nextProgressNextBlockLogIndex =
-            updatedFetchState->FetchState.getProgressNextBlockLogIndex
-
-          dcs->Array.forEach(dc => {
-            // Important: This should be a registering block number.
-            // This works for now since dc.startBlock is a registering block number.
-            if (
-              dc.startBlock <= nextProgressBlockNumber ||
-                switch (nextProgressNextBlockLogIndex, dc.register) {
-                | (Some(nextProgressNextBlockLogIndex), DC(dcData)) =>
-                  dc.startBlock === nextProgressBlockNumber + 1 &&
-                    dcData.registeringEventLogIndex <= nextProgressNextBlockLogIndex
-                | _ => false
-                }
-            ) {
-              batchDcs->Array.push(dc)
-            } else {
-              // Mutate the array we passed to the updateInternal beforehand
-              leftDcsToStore->Array.push(dc)
-            }
-          })
-
-          dcsToStoreByChainId->Js.Dict.set(fetchState.chainId->Int.toString, batchDcs)
-          updatedFetchState
-        }
+      | dcs => {
+          // First, advance internal state based on popped items.
+          let tmp = fetchState->FetchState.updateInternal(~mutItems=leftItems)
+          let nextProgressBlockNumber = tmp->FetchState.getProgressBlockNumber
+          let nextProgressNextBlockLogIndex = tmp->FetchState.getProgressNextBlockLogIndex
+
+          let batchDcs = []
+          let leftDcs = []
+          dcs->Array.forEach(dc => {
+            let shouldStoreNow =
+              dc.startBlock <= nextProgressBlockNumber ||
+              switch (nextProgressNextBlockLogIndex, dc.register) {
+              | (Some(nextIdx), DC(dcData)) =>
+                dc.startBlock === nextProgressBlockNumber + 1 &&
+                dcData.registeringEventLogIndex <= nextIdx
+              | _ => false
+              }
+            if shouldStoreNow { batchDcs->Array.push(dc) } else { leftDcs->Array.push(dc) }
+          })
+
+          dcsToStoreByChainId->Js.Dict.set(fetchState.chainId->Int.toString, batchDcs)
+          // Now persist the remaining DCs in the fetch state.
+          tmp->FetchState.updateInternal(~dcsToStore=leftDcs)
+        }
codegenerator/cli/npm/envio/src/Batch.res (3)

20-49: Ordered chain selection by timestamp index (LGTM, with tie-break).

getOrderedNextChain respects per-chain offsets and provides deterministic chainId tie-break.


78-121: Block‑atomic accumulation in ordered mode (LGTM).

Using getReadyItemsCount(~fromItem=itemsCountBefore) ensures full-block inclusion when crossing targets.


123-161: Unordered preparation honors full‑batch preference and block atomicity (LGTM).

Integration with FetchState.filterAndSortForUnorderedBatch(~batchSizeTarget) and getReadyItemsCount is correct.

@DZakh DZakh changed the base branch from main to dz/prevent-dc-overstore September 23, 2025 11:31
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (6)
scenarios/test_codegen/test/lib_tests/FetchState_test.res (1)

1945-1952: Fix assertion message: ordering is ASC, and it’s a buffer

Expected order is earliest→latest. Update the message for accuracy.

-      ~message="Queue must be sorted DESC by (blockNumber, logIndex) regardless of input order",
+      ~message="Buffer must be sorted ASC by (blockNumber, logIndex) regardless of input order",
codegenerator/cli/npm/envio/src/FetchState.res (2)

948-969: Guard getReadyItemsCount for non‑positive targetSize

Avoid surprising behavior when targetSize <= 0.

-let getReadyItemsCount = (fetchState: t, ~targetSize: int, ~fromItem) => {
+let getReadyItemsCount = (fetchState: t, ~targetSize: int, ~fromItem) => {
+  if targetSize <= 0 {
+    0
+  } else {
     let readyBlockNumber = ref(fetchState->bufferBlockNumber)
     let acc = ref(0)
     let isFinished = ref(false)
     while !isFinished.contents {
       switch fetchState.buffer->Belt.Array.get(fromItem + acc.contents) {
       | Some(item) =>
         let itemBlockNumber = item->Internal.getItemBlockNumber
         if itemBlockNumber <= readyBlockNumber.contents {
           acc := acc.contents + 1
           if acc.contents === targetSize {
             // Should finish accumulating items from the same block
             readyBlockNumber := itemBlockNumber
           }
         } else {
           isFinished := true
         }
       | None => isFinished := true
       }
     }
     acc.contents
+  }
 }

1256-1261: Edge‑case: treat batchSizeTarget <= 0 as never full

Current code is safe via Belt.Array.get, but making intent explicit improves readability.

-  let hasFullBatch = ({buffer} as fetchState: t, ~batchSizeTarget) => {
-    switch buffer->Belt.Array.get(batchSizeTarget - 1) {
+  let hasFullBatch = ({buffer} as fetchState: t, ~batchSizeTarget) => {
+    if batchSizeTarget <= 0 {
+      false
+    } else switch buffer->Belt.Array.get(batchSizeTarget - 1) {
       | Some(item) => item->Internal.getItemBlockNumber <= fetchState->bufferBlockNumber
       | None => false
     }
   }
codegenerator/cli/npm/envio/src/Batch.res (2)

16-18: Fix typo in docstring

“returnes” → “returns”.

-/**
- It either returnes an earliest item among all chains, or None if no chains are actively indexing
- */
+/**
+ Returns the earliest chain among all chains, or None if no chains are actively indexing.
+ */

19-48: Clarify tie-break logic and avoid variable shadowing for readability

Rename the outer ref and inner matched value to avoid shadowing and make the tie-break clearer.

-let getOrderedNextChain = (fetchStates: ChainMap.t<FetchState.t>, ~batchSizePerChain) => {
-  let earliestChain: ref<option<FetchState.t>> = ref(None)
+let getOrderedNextChain = (fetchStates: ChainMap.t<FetchState.t>, ~batchSizePerChain) => {
+  let earliestChainRef: ref<option<FetchState.t>> = ref(None)
   let earliestChainTimestamp = ref(0)
   let chainKeys = fetchStates->ChainMap.keys
   for idx in 0 to chainKeys->Array.length - 1 {
     let chain = chainKeys->Array.get(idx)
     let fetchState = fetchStates->ChainMap.get(chain)
     if fetchState->FetchState.isActivelyIndexing {
       let timestamp = fetchState->FetchState.getTimestampAt(
         ~index=switch batchSizePerChain->Utils.Dict.dangerouslyGetByIntNonOption(
           chain->ChainMap.Chain.toChainId,
         ) {
         | Some(batchSize) => batchSize
         | None => 0
         },
       )
-      switch earliestChain.contents {
-      | Some(earliestChain)
+      switch earliestChainRef.contents {
+      | Some(currEarliest)
         if timestamp > earliestChainTimestamp.contents ||
           (timestamp === earliestChainTimestamp.contents &&
-            chain->ChainMap.Chain.toChainId > earliestChain.chainId) => ()
+            chain->ChainMap.Chain.toChainId > currEarliest.chainId) => ()
       | _ => {
-          earliestChain := Some(fetchState)
+          earliestChainRef := Some(fetchState)
           earliestChainTimestamp := timestamp
         }
       }
     }
-  }
-  earliestChain.contents
+  }
+  earliestChainRef.contents
 }
codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (1)

9-16: Double-check buffer sizing formula

Using 1 when activeChainsCount > 3 likely under-provisions the buffer as chains scale. If the intent is “at least 3 batches or activeChainsCount batches,” consider:

-  | None =>
-    config.batchSize * (activeChainsCount > targetBatchesInBuffer ? 1 : targetBatchesInBuffer)
+  | None =>
+    config.batchSize
+    * (activeChainsCount > targetBatchesInBuffer ? activeChainsCount : targetBatchesInBuffer)

If the existing behavior is intentional, add a brief comment explaining the rationale. Otherwise, apply the change above.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7a6ccad and 788ccae.

📒 Files selected for processing (8)
  • codegenerator/cli/npm/envio/src/Batch.res (1 hunks)
  • codegenerator/cli/npm/envio/src/FetchState.res (16 hunks)
  • codegenerator/cli/npm/envio/src/Persistence.res (1 hunks)
  • codegenerator/cli/npm/envio/src/db/InternalTable.res (1 hunks)
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res (3 hunks)
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (4 hunks)
  • scenarios/test_codegen/test/lib_tests/FetchState_test.res (34 hunks)
  • scenarios/test_codegen/test/lib_tests/PgStorage_test.res (8 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • codegenerator/cli/npm/envio/src/db/InternalTable.res
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{res,resi}

📄 CodeRabbit inference engine (.cursor/rules/rescript.mdc)

**/*.{res,resi}: Never use [| item |] to create an array. Use [ item ] instead.
Must always use = for setting value to a field. Use := only for ref values created using ref function.
ReScript has record types which require a type definition before hand. You can access record fields by dot like foo.myField.
It's also possible to define an inline object, it'll have quoted fields in this case.
Use records when working with structured data, and objects to conveniently pass payload data between functions.
Never use %raw to access object fields if you know the type.

Files:

  • codegenerator/cli/npm/envio/src/Persistence.res
  • codegenerator/cli/npm/envio/src/FetchState.res
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res
  • codegenerator/cli/npm/envio/src/Batch.res
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
  • scenarios/test_codegen/test/lib_tests/FetchState_test.res
  • scenarios/test_codegen/test/lib_tests/PgStorage_test.res
**/*.res

📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)

Prefer reading ReScript .res modules directly; compiled .js artifacts can be ignored

Files:

  • codegenerator/cli/npm/envio/src/Persistence.res
  • codegenerator/cli/npm/envio/src/FetchState.res
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res
  • codegenerator/cli/npm/envio/src/Batch.res
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
  • scenarios/test_codegen/test/lib_tests/FetchState_test.res
  • scenarios/test_codegen/test/lib_tests/PgStorage_test.res
codegenerator/cli/templates/{static,dynamic}/codegen/src/**/*

📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)

Edit template versions under codegenerator/cli/templates/static/codegen/src or codegenerator/cli/templates/dynamic/codegen/src instead of editing generated/src

Files:

  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build_and_test
🔇 Additional comments (21)
codegenerator/cli/npm/envio/src/Persistence.res (1)

145-147: Resume checkpoints: progressBlockNumber is the right pivot — LGTM

Switching checkpoints to use c.progressBlockNumber per chain matches the schema/API changes and simplifies resume.

scenarios/test_codegen/test/lib_tests/PgStorage_test.res (3)

147-147: envio_chains DDL aligns with progress-based tracking

Schema includes progress_block and _num_batches_fetched with sensible NOT NULL constraints. Looks consistent across schemas.

Also applies to: 232-232, 314-314


187-189: Initial INSERTs match the new column set and ordering

Column lists explicitly include source_block/buffer_block/progress_block; sentinel -1 for buffer/progress aligns with restart logic.

Please confirm we consistently treat progress_block = -1 as “unset” (e.g., ChainFetcher.makeFromDbState).

Also applies to: 581-583, 608-610, 644-647


533-536: Progress fields UPDATE placeholders are correct

Ordering: id=$1, progress_block=$2, events_processed=$3 is coherent and unambiguous.

codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res (2)

271-276: Restart fallback logic is correct and explicit

Uses resumedChainState.progressBlockNumber when >= 0, else (startBlock - 1). Passed through to make as progressBlockNumber.

Also applies to: 285-286


207-208: processingFilters default to None across constructors

Constructor sets processingFilters: None; addProcessingFilter still enables runtime filters when needed.

scenarios/test_codegen/test/lib_tests/FetchState_test.res (2)

8-16: Back‑compat shim (oldQueueItem + getEarliestEvent) is tidy

Keeps legacy Some/None behavior while exercising new buffer semantics. Good coverage.

Also applies to: 19-28


2403-2408: Tests updated to batchSizeTarget — LGTM

Renames and expectations around filterAndSortForUnorderedBatch reflect the new API.

Also applies to: 2445-2450, 2485-2492

codegenerator/cli/npm/envio/src/FetchState.res (2)

64-65: dcsToStore now a plain array — consistent and simpler

Removal of option reduces branching; updated writes/filters look correct.

Also applies to: 571-574, 1115-1116, 1217-1221


79-80: Buffer semantics and ordering are coherent

  • Buffer is earliest→latest; comparator sorts by (blockNumber, logIndex) ascending.
  • Metrics updated to use buffer size/block. All consistent.

Also applies to: 188-194, 310-315, 323-329

codegenerator/cli/npm/envio/src/Batch.res (6)

11-14: Type surface changes look good

updatedFetchStates and creationTimeMs integration is clear and consistent with the PR goals.


50-57: LGTM on readiness helper

Efficient reuse via an immutable empty dict and correct gating on hasReadyItem.


59-65: LGTM

Correct readiness check for unordered mode.


67-75: Mode-gated readiness is clear

Switching on multichain mode is straightforward.


122-160: Verified — unordered path won't slice a block mid-way

getReadyItemsCount (FetchState.res) updates readyBlockNumber to the block of the target-th item and continues consuming that block; filterAndSortForUnorderedBatch prioritizes full batches. prepareUnorderedBatch calls getReadyItemsCount per chain, so blocks are not split across batches.


77-118: Confirm getReadyItemsCount preserves block atomicity and may exceed the remaining target

FetchState.res:getReadyItemsCount sets readyBlockNumber to the block of the item that first reaches targetSize and then continues counting only items with that blockNumber, so it will include the rest of that block (possibly exceeding targetSize). It uses blockNumber (not logIndex) and relies on buffer ordering to preserve per-block logIndex order.

codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (5)

49-54: Prometheus max batch size source updated

Switching to config.batchSize matches the new config-driven sizing.


98-102: Fetch state access simplified

Directly exposing cf.fetchState is clear and avoids deep-copy overhead.


108-124: Batch creation flow aligns with new APIs

Mode selection and per-chain counters are wired correctly to Batch.prepare*.


130-161: Avoid mutating dcsToStore after passing into updateInternal

leftDcsToStore is passed into updateInternal and then mutated (Line 149). This aliasing is brittle and couples to updateInternal’s internal semantics.

Apply this safer partition-first approach:

-      | dcs => {
-          let leftDcsToStore = []
-          let batchDcs = []
-          let updatedFetchState =
-            fetchState->FetchState.updateInternal(~mutItems=leftItems, ~dcsToStore=leftDcsToStore)
-          let nextProgressBlockNumber = updatedFetchState->FetchState.getProgressBlockNumber
-
-          dcs->Array.forEach(dc => {
-            // Important: This should be a registering block number.
-            // This works for now since dc.startBlock is a registering block number.
-            if dc.startBlock <= nextProgressBlockNumber {
-              batchDcs->Array.push(dc)
-            } else {
-              // Mutate the array we passed to the updateInternal beforehand
-              leftDcsToStore->Array.push(dc)
-            }
-          })
-
-          dcsToStoreByChainId->Js.Dict.set(fetchState.chainId->Int.toString, batchDcs)
-          updatedFetchState
-        }
+      | dcs => {
+          // First, advance internal state based on popped items.
+          let tmp = fetchState->FetchState.updateInternal(~mutItems=leftItems)
+          let nextProgressBlockNumber = tmp->FetchState.getProgressBlockNumber
+
+          let batchDcs = []
+          let leftDcs = []
+          dcs->Array.forEach(dc => {
+            // dc.startBlock is a registering block number.
+            if dc.startBlock <= nextProgressBlockNumber {
+              batchDcs->Array.push(dc)
+            } else {
+              leftDcs->Array.push(dc)
+            }
+          })
+
+          dcsToStoreByChainId->Js.Dict.set(fetchState.chainId->Int.toString, batchDcs)
+          // Now persist the remaining DCs in the fetch state.
+          tmp->FetchState.updateInternal(~dcsToStore=leftDcs)
+        }

167-201: Progress/accounting assembly looks consistent

progressedChains and creationTimeMs fields are computed coherently with per-chain sizes.

Optional: add a debug counter or assertion in non-prod builds to verify
sum(batchSizePerChain) === items.length for early detection of consistency issues.

@DZakh DZakh requested a review from JonoPrest September 23, 2025 11:39
Base automatically changed from dz/prevent-dc-overstore to main September 23, 2025 17:04
NoItem({
latestFetchedBlock: fetchState->bufferBlock,
})
let getReadyItemsCount = (fetchState: t, ~targetSize: int, ~fromItem) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only used in the test? Maybe should be a comment then or added to the naming

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvmnd I see where its used

~persistence=codegenPersistence,
~ecosystem=InternalConfig.Evm,
~registrations=?,
~batchSize=5000,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think this should default to Env.maxBatchSize or whatever the env var is?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I forgot to pass an env variable to the config 😅

Comment on lines +936 to +939
// FIXME: When state.rollbackState is RollbackInMemStore
// If we increase progress in this case (no items)
// and then indexer restarts - there's a high chance of missing
// the rollback. This should be tested and fixed.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this introduced or was this the previous behaviour?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was before.

Copy link
Collaborator

@JonoPrest JonoPrest left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, do you think we have sufficient test coverage on the batch creation and fetch state updates for this change? Its quite significant otherwise and difficult to be sure just from code review

@DZakh
Copy link
Member Author

DZakh commented Sep 25, 2025

I'd say the test coverage is quite decent.

@DZakh DZakh enabled auto-merge (squash) September 30, 2025 08:28
@DZakh
Copy link
Member Author

DZakh commented Sep 30, 2025

I removed ENVIO_BATCH_SIZE in the end. I'd like to move to config first isntead of env vars

@DZakh DZakh merged commit d8846b4 into main Sep 30, 2025
2 checks passed
@DZakh DZakh deleted the dz/full-block-in-batch branch September 30, 2025 08:37
@coderabbitai coderabbitai bot mentioned this pull request Oct 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants