Skip to content

Conversation

@DZakh
Copy link
Member

@DZakh DZakh commented Sep 2, 2025

  • Add progressBlock and sourceBlock to _meta query
  • Removed internal event_sync_state table and started tracking indexing progress in envio_chains. This lead to:
    • Enables Block Handlers feature
    • Makes processed events counter restart resistant (still migth have miscalculations on reorgs)
    • Makes restarts for indexers without events more efficient.
  • Removed cache from .gitignore in init templates
  • Add detailed indexer resume log with checkpoints for uni v4 debugging
  • Breaking change: Removed eventSyncState from mock db

Summary by CodeRabbit

  • New Features

    • Multi-chain event batching with ordered/unordered strategies and per-chain progressedChains reporting.
  • Refactor

    • Global move from "latest processed" to committed progress model; batch-driven execution and new progressedChains payloads; DB schema now tracks progress_block/_progress_log_index.
  • Chores

    • Storage API renamed to resumeInitialState; removed legacy event-sync state APIs; improved exception log formatting; small tooling tweaks (Dict helpers, pool debug signature, .gitignore).
  • Tests

    • Mocks and tests updated to match new persistence/progress model.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 2, 2025

Walkthrough

Adds a multi-chain Batch module with ordered/unordered batching and per-chain progressedChain tracking; replaces EventSyncState with per-chain progress fields on envio_chains; renames loadInitialState → resumeInitialState and runs cleanup-on-restart; and rewires fetcher, ChainManager, IO, mocks, and tests to use progressedChains.

Changes

Cohort / File(s) Summary
Batching & fetch orchestration
codegenerator/cli/npm/envio/src/Batch.res, codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res, scenarios/test_codegen/test/ChainManager_test.res
New Batch module and types (progressedChain, Batch.t), comparator utilities and ordered/unordered popping (popOrderedBatchItems, popUnorderedBatchItems); ChainManager refactored to call Batch APIs and return Batch.t; tests updated to reference Batch functions.
FetchState progress helpers & tests
codegenerator/cli/npm/envio/src/FetchState.res, scenarios/test_codegen/test/lib_tests/FetchState_test.res
Added getProgressBlockNumber and getProgressNextBlockLogIndex helpers and tests validating progress semantics.
Chain fetcher / fetcher state
codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res, codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
Replaced optional latestProcessedBlock with concrete committedProgressBlockNumber/progressBlockNumber; restart/filter logic switched to use progress fields; removed old restart cleanup calls; integrated Batch-driven batch creation.
Persistence & startup
codegenerator/cli/npm/envio/src/Persistence.res, codegenerator/cli/npm/envio/src/PgStorage.res, scenarios/erc20_multichain_factory/test/DynamicContractRecovery_test.res
Storage API rename loadInitialStateresumeInitialState; PgStorage.resumeInitialState runs DynamicContractRegistry.makeCleanUpOnRestartQuery and issues cleanup via Postgres.unsafe when chains present; startup table-detection logic adjusted; tests adapted to use local PgStorage and resume flow.
DB schema & internals
codegenerator/cli/npm/envio/src/db/InternalTable.res, codegenerator/cli/npm/envio/src/db/InternalTable.gen.ts, scenarios/test_codegen/test/lib_tests/PgStorage_test.res
Removed EventSyncState module/types; added progress_block and _progress_log_index to envio_chains, new Chains.metaFields/progressFields, split update paths (setMeta, setProgressedChains), and added DynamicContractRegistry.makeCleanUpOnRestartQuery; TS type EventSyncState_t removed.
Event processing & IO
codegenerator/cli/templates/static/codegen/src/EventProcessing.res, codegenerator/cli/templates/static/codegen/src/IO.res, codegenerator/cli/templates/static/codegen/src/InMemoryStore.res, codegenerator/cli/templates/static/codegen/src/eventFetching/Index.res
processEventBatch and IO.executeBatch signatures now accept ~progressedChains; removed eventSyncState writes; IO now calls InternalTable.Chains.setProgressedChains within batch transaction before raw events/entities; InMemoryStore removed eventSyncState; Index wiring updated to use committedProgressBlockNumber.
Db functions / implementation
codegenerator/cli/templates/static/codegen/src/db/DbFunctions.res, codegenerator/cli/templates/static/codegen/src/db/DbFunctionsImplementation.js, codegenerator/cli/templates/static/codegen/src/db/Db.res
Removed EventSyncState externals and JS implementations (readLatestSyncedEventOnChainId, batchSetEventSyncState, etc.) and some restart-cleanup externals; added commented Postgres debug hook.
Mocks & tests
codegenerator/cli/templates/dynamic/codegen/src/TestHelpers_MockDb.res.hbs, scenarios/test_codegen/test/helpers/Mock.res, scenarios/test_codegen/test/lib_tests/Persistence_test.res
Removed eventSyncState from mock DB and mock store operators; mock storage API renamed to expose resumeInitialState; tests and mock internals updated to use resumeInitialState naming.
Utilities & logging / bindings
codegenerator/cli/npm/envio/src/Utils.res, codegenerator/cli/npm/envio/src/Throttler.res, codegenerator/cli/npm/envio/src/bindings/Postgres.res
Added Dict.setByInt external and Dict.incrementByInt; Throttler logs now pass prettified exceptions; Postgres poolConfig.debug signature changed to a labeled four-arg function.
Misc / packaging
codegenerator/cli/templates/static/blank_template/shared/.gitignore
Removed ignore rule for cache path.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant CM as ChainManager
  participant Batch as Batch
  participant FS as FetchState (per-chain)
  participant IO as IO.executeBatch
  participant DB as Postgres (Chains / RawEvents)

  CM->>Batch: popOrderedBatchItems / popUnorderedBatchItems(fetchStates, maxBatchSize)
  Batch->>FS: getEarliestEvent / popItemOffQueue (per chain)
  Batch-->>CM: Batch.t { items, progressedChains, fetchStates }
  CM->>IO: executeBatch(sql, ~progressedChains, ~inMemoryStore, ...)
  IO->>DB: InternalTable.Chains.setProgressedChains(progressedChains)
  IO->>DB: insert raw_events and entities
  DB-->>IO: commit / ok
Loading
sequenceDiagram
  autonumber
  participant App as App
  participant PG as PgStorage.resumeInitialState
  participant DCR as DynamicContractRegistry
  participant SQL as Postgres

  App->>PG: resumeInitialState()
  PG->>SQL: SELECT envio_chains
  alt chains not empty
    PG->>DCR: makeCleanUpOnRestartQuery(pgSchema, chains)
    DCR-->>PG: cleanup SQL
    PG->>SQL: Postgres.unsafe(cleanup SQL)
  end
  PG-->>App: initialState
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120+ minutes

Possibly related PRs

Suggested reviewers

  • moose-code
  • JonoPrest

Poem

"I nibble queues from chain to chain,
stack progress blocks like carrot grain.
EventSync slept; new fields awake,
resume the race and tidy the stake.
Hop — batch by batch — we clear the lane." 🐇

✨ Finishing Touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch dz/progress-block

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore or @coderabbit ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
codegenerator/cli/templates/static/codegen/src/EventProcessing.res (1)

286-295: Bug: Missing ~progressedChains when calling IO.executeBatch

executeBatch now requires ~progressedChains. The current call omits it and won’t compile.

Apply:

-      switch await Db.sql->IO.executeBatch(
-        ~inMemoryStore,
+      switch await Db.sql->IO.executeBatch(
+        ~progressedChains,
+        ~inMemoryStore,
         ~isInReorgThreshold,
         ~config,
         ~escapeTables?,
       ) {
codegenerator/cli/npm/envio/src/db/InternalTable.res (1)

358-372: Null-out the progress_block sentinel in chain metadata view
In makeChainMetadataViewQuery (lines ~374–389 of InternalTable.res), wrap the progress_block column in NULLIF to preserve its nullable semantics and avoid exposing the -1 sentinel to consumers:

-       "${(#progress_block: Chains.field :> string)}" AS "latest_processed_block",
+       NULLIF("${(#progress_block: Chains.field :> string)}", -1) AS "latest_processed_block",

Optionally apply the same NULLIF transformation in makeMetaViewQuery for the "progressBlock" alias if downstream tooling expects NULL instead of -1.

🧹 Nitpick comments (15)
codegenerator/cli/templates/static/codegen/src/db/Db.res (1)

15-15: Avoid logging the connection object in debug hook to prevent secret leakage.

If you enable this, logging connection can expose credentials/DSN. Prefer logging the query (and maybe a connection id) only.

Apply this diff if you plan to turn the hook on:

-  // debug: (~connection, ~query, ~params as _, ~types as _) => Js.log2(connection, query),
+  // debug: (~connection as _, ~query, ~params as _, ~types as _) => Js.log(query),
codegenerator/cli/npm/envio/src/Utils.res (1)

134-136: Document integer-key semantics for Js.Dict.

Js.Dict keys are strings; using int keys stringifies them. Add a short comment to avoid misuse with string-based helpers in the same module.

Apply:

 @set_index
 external setByInt: (dict<'a>, int, 'a) => unit = ""
+// NOTE: Js.Dict keys are strings; int keys are stringified (e.g. 1 -> "1").
codegenerator/cli/templates/static/codegen/src/Index.res (1)

222-223: Consistent mapping in both Synced/Syncing branches — OK.

All branches now source latestProcessedBlock from committedProgressBlockNumber.

You can store hasProcessedToEndblock in a local and reuse it to avoid double calls:

-      let currentBlockHeight =
-        cf->ChainFetcher.hasProcessedToEndblock
+      let currentBlockHeight =
+        hasProcessedToEndblock
           ? cf.endBlock->Option.getWithDefault(cf.currentBlockHeight)
           : cf.currentBlockHeight

Also applies to: 229-229, 234-234, 239-239

codegenerator/cli/npm/envio/src/PgStorage.res (1)

897-913: Cleanup-on-restart executes only when chains exist — good; consider atomicity.

Running makeCleanUpOnRestartQuery after state load is fine. If it performs multiple mutations, wrap in a transaction to avoid partial cleanup on failure.

-    if chains->Utils.Array.notEmpty {
-      let () =
-        await sql->Postgres.unsafe(
-          InternalTable.DynamicContractRegistry.makeCleanUpOnRestartQuery(~pgSchema, ~chains),
-        )
-    }
+    if chains->Utils.Array.notEmpty {
+      let _ =
+        await sql->Postgres.beginSql(sqlInner =>
+          sqlInner->Postgres.unsafe(
+            InternalTable.DynamicContractRegistry.makeCleanUpOnRestartQuery(~pgSchema, ~chains),
+          )
+        )
+    }
scenarios/test_codegen/test/helpers/Mock.res (1)

56-57: Align resolver naming with the new API (consider renaming resolveLoadInitialState).

The field and helper arrays are renamed to “resume…”, but the public resolver is still named resolveLoadInitialState. For consistency and discoverability, prefer resolveResumeInitialState (optionally keep the old name as a temporary alias).

Apply:

 type t = {
-  resumeInitialStateCalls: array<bool>,
-  resolveLoadInitialState: Persistence.initialState => unit,
+  resumeInitialStateCalls: array<bool>,
+  resolveResumeInitialState: Persistence.initialState => unit,
+  /* optional back-compat alias; remove after migration */
+  resolveLoadInitialState: Persistence.initialState => unit,
   ...
 }

 ...

-  resumeInitialStateCalls,
-  resolveLoadInitialState: (initialState: Persistence.initialState) => {
-    resumeInitialStateResolveFns->Js.Array2.forEach(resolve => resolve(initialState))
-  },
+  resumeInitialStateCalls,
+  resolveResumeInitialState: (initialState: Persistence.initialState) => {
+    resumeInitialStateResolveFns->Js.Array2.forEach(resolve => resolve(initialState))
+  },
+  /* optional back-compat alias; remove after migration */
+  resolveLoadInitialState: (initialState: Persistence.initialState) => {
+    resumeInitialStateResolveFns->Js.Array2.forEach(resolve => resolve(initialState))
+  },

Also applies to: 94-96, 103-106, 132-137

codegenerator/cli/templates/static/codegen/src/EventProcessing.res (1)

247-258: Optional: Enrich per-chain logs with next log index and total processed

Including progressNextBlockLogIndex and totalEventsProcessed can simplify debugging and monitoring.

-      byChain->Utils.Dict.setByInt(
+      byChain->Utils.Dict.setByInt(
         data.chainId,
         {
           "batchSize": data.batchSize,
           "toBlockNumber": data.progressBlockNumber,
+          "nextBlockLogIndex": data.progressNextBlockLogIndex,
+          "totalEventsProcessed": data.totalEventsProcessed,
         },
       )
codegenerator/cli/templates/static/codegen/src/IO.res (1)

238-247: Skip no-op DB call when progressedChains is empty

Avoid an unnecessary call if there’s nothing to persist.

-        await Belt.Array.concatMany([
-          [
-            sql =>
-              sql->InternalTable.Chains.setProgressedChains(
-                ~pgSchema=Db.publicSchema,
-                ~progressedChains,
-              ),
-            setRawEvents,
-          ],
-          setEntities,
-        ])
+        await Belt.Array.concatMany([
+          (progressedChains->Array.length > 0
+            ? [
+                [
+                  sql =>
+                    sql->InternalTable.Chains.setProgressedChains(
+                      ~pgSchema=Db.publicSchema,
+                      ~progressedChains,
+                    ),
+                  setRawEvents,
+                ],
+              ]
+            : [[setRawEvents]]),
+          [setEntities],
+        ])
codegenerator/cli/npm/envio/src/Batch.res (1)

99-146: Optimized unordered batch processing for single-chain efficiency.

The unordered batch implementation efficiently groups events by chain to maximize loader optimization opportunities. The use of refs for mutable state is appropriate for performance-critical batch building.

Consider adding a comment explaining why grouping by chain improves loader performance (e.g., cache locality, reduced context switching).

  // Accumulate items for all actively indexing chains
  // the way to group as many items from a single chain as possible
- // This way the loaders optimisations will hit more often
+ // This way the loaders optimizations will hit more often due to
+ // improved cache locality and reduced context switching between chains
codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (1)

111-141: Consider extracting progress chain creation to a helper function.

The progress chain creation logic (lines 111-141) is somewhat complex and could benefit from extraction into a dedicated helper function for improved readability and testability.

+ let createProgressedChain = (
+   chain: ChainMap.Chain.t,
+   chainFetcher: ChainFetcher.t,
+   updatedFetchState: FetchState.t,
+   chainBatchSize: option<int>,
+ ): option<Batch.progressedChain> => {
+   let nextProgressBlockNumber = updatedFetchState->FetchState.getProgressBlockNumber
+   if (
+     chainFetcher.committedProgressBlockNumber < nextProgressBlockNumber ||
+     chainBatchSize->Option.isSome
+   ) {
+     let batchSize = chainBatchSize->Option.getWithDefault(0)
+     Some({
+       chainId: chain->ChainMap.Chain.toChainId,
+       batchSize,
+       progressBlockNumber: nextProgressBlockNumber,
+       progressNextBlockLogIndex: updatedFetchState->FetchState.getProgressNextBlockLogIndex,
+       totalEventsProcessed: chainFetcher.numEventsProcessed + batchSize,
+     })
+   } else {
+     None
+   }
+ }

  let progressedChains = []
  chainManager.chainFetchers
  ->ChainMap.entries
  ->Array.forEach(((chain, chainFetcher)) => {
    let updatedFetchState = fetchStates->ChainMap.get(chain)
-   let nextProgressBlockNumber = updatedFetchState->FetchState.getProgressBlockNumber
    let maybeItemsCountInBatch =
      sizePerChain->Utils.Dict.dangerouslyGetNonOption(
        chain->ChainMap.Chain.toChainId->Int.toString,
      )
-   if (
-     chainFetcher.committedProgressBlockNumber < nextProgressBlockNumber ||
-       // It should never be 0
-       maybeItemsCountInBatch->Option.isSome
-   ) {
-     let chainBatchSize = maybeItemsCountInBatch->Option.getWithDefault(0)
-     progressedChains
-     ->Js.Array2.push(
-       (
-         {
-           chainId: chain->ChainMap.Chain.toChainId,
-           batchSize: chainBatchSize,
-           progressBlockNumber: nextProgressBlockNumber,
-           progressNextBlockLogIndex: updatedFetchState->FetchState.getProgressNextBlockLogIndex,
-           totalEventsProcessed: chainFetcher.numEventsProcessed + chainBatchSize,
-         }: Batch.progressedChain
-       ),
-     )
-     ->ignore
-   }
+   switch createProgressedChain(chain, chainFetcher, updatedFetchState, maybeItemsCountInBatch) {
+   | Some(progressedChain) => progressedChains->Js.Array2.push(progressedChain)->ignore
+   | None => ()
+   }
  })
codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res (1)

202-207: Restart math and filter edge-cases: confirm off-by-one and tuple-compare semantics.

  • With restartBlockNumber = progressBlockNumber + 1 and filter (bn, li) > (restartBlockNumber, progressNextBlockLogIndex), you’ll skip everything up to and including progressNextBlockLogIndex in the next block. That matches “processed up to logIndex (inclusive)” semantics. Please confirm that progressNextBlockLogIndex indeed denotes the last processed log index in block progressBlockNumber + 1 (not “next to process”). If it’s “next to process,” change > to >=.
  • ReScript structural comparison on tuples is fine here, but it’s subtle. Consider a short inline comment to lock intent.

Also applies to: 208-224

codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res (2)

174-197: Persist only meta-fields; consider id typing consistency.

setMeta builds params with id as a string key. Downstream, InternalTable.Chains.setMeta currently pushes that string as the $1 param. Postgres will coerce, but passing an int is cleaner. See suggested change in InternalTable.res to parse to int before binding.


610-647: Batch post-processing flow looks correct; minor write-amplification note.

EventBatchProcessed now updates progressedChains, then schedules UpdateChainMetaDataAndCheckForExit. With throttling, this is fine. If write pressure shows up, consider updating metadata only when fetchState changes (as the FIXME hints).

codegenerator/cli/npm/envio/src/db/InternalTable.res (3)

196-218: Bind id as int (avoid string coercion at the driver).

Small tweak: parse Dict key (string) to int before binding to $1. Safer and clearer.

-      // Push id first (for WHERE clause)
-      params->Js.Array2.push(chainId->(Utils.magic: string => unknown))->ignore
+      // Push id first (for WHERE clause) — bind as int
+      let chainIdInt =
+        switch Belt.Int.fromString(chainId) {
+        | Some(id) => id
+        | None => Js.Exn.raiseError("Invalid chain id in setMeta")
+        }
+      params->Js.Array2.push(chainIdInt->(Utils.magic: int => unknown))->ignore

220-251: Batch progress updates: consider single statement or transaction.

Current implementation issues N updates in parallel. For many chains this increases round-trips and risks partial writes on failure. Consider:

  • Using a single UPDATE with FROM (VALUES ...) and matching on id, or
  • Wrapping the per-chain updates in an explicit transaction.

Happy to provide a VALUES-based UPDATE if desired.


430-447: Indexing advice for cleanup path.

Consider a composite index on:

  • dynamic_contract_registry(chain_id, registering_event_block_number, registering_event_log_index)
  • dynamic_contract_registry_history(entity_history_chain_id, entity_history_block_number, entity_history_log_index)

This will make the restart cleanup O(log n) and predictable on large datasets.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between e35df5e and 313325f.

📒 Files selected for processing (22)
  • codegenerator/cli/npm/envio/src/Batch.res (1 hunks)
  • codegenerator/cli/npm/envio/src/FetchState.res (1 hunks)
  • codegenerator/cli/npm/envio/src/Persistence.res (2 hunks)
  • codegenerator/cli/npm/envio/src/PgStorage.res (5 hunks)
  • codegenerator/cli/npm/envio/src/Throttler.res (1 hunks)
  • codegenerator/cli/npm/envio/src/Utils.res (1 hunks)
  • codegenerator/cli/npm/envio/src/bindings/Postgres.res (1 hunks)
  • codegenerator/cli/npm/envio/src/db/InternalTable.gen.ts (0 hunks)
  • codegenerator/cli/npm/envio/src/db/InternalTable.res (9 hunks)
  • codegenerator/cli/templates/dynamic/codegen/src/TestHelpers_MockDb.res.hbs (1 hunks)
  • codegenerator/cli/templates/static/codegen/src/EventProcessing.res (1 hunks)
  • codegenerator/cli/templates/static/codegen/src/IO.res (2 hunks)
  • codegenerator/cli/templates/static/codegen/src/InMemoryStore.res (0 hunks)
  • codegenerator/cli/templates/static/codegen/src/Index.res (2 hunks)
  • codegenerator/cli/templates/static/codegen/src/db/Db.res (1 hunks)
  • codegenerator/cli/templates/static/codegen/src/db/DbFunctions.res (0 hunks)
  • codegenerator/cli/templates/static/codegen/src/db/DbFunctionsImplementation.js (0 hunks)
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res (7 hunks)
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (4 hunks)
  • codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res (8 hunks)
  • scenarios/test_codegen/test/helpers/Mock.res (4 hunks)
  • scenarios/test_codegen/test/lib_tests/Persistence_test.res (5 hunks)
💤 Files with no reviewable changes (4)
  • codegenerator/cli/templates/static/codegen/src/InMemoryStore.res
  • codegenerator/cli/templates/static/codegen/src/db/DbFunctionsImplementation.js
  • codegenerator/cli/templates/static/codegen/src/db/DbFunctions.res
  • codegenerator/cli/npm/envio/src/db/InternalTable.gen.ts
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{res,resi}

📄 CodeRabbit inference engine (.cursor/rules/rescript.mdc)

**/*.{res,resi}: Never use [| item |] to create an array. Use [ item ] instead.
Must always use = for setting value to a field. Use := only for ref values created using ref function.
ReScript has record types which require a type definition before hand. You can access record fields by dot like foo.myField.
It's also possible to define an inline object, it'll have quoted fields in this case.
Use records when working with structured data, and objects to conveniently pass payload data between functions.
Never use %raw to access object fields if you know the type.

Files:

  • codegenerator/cli/templates/static/codegen/src/db/Db.res
  • codegenerator/cli/npm/envio/src/bindings/Postgres.res
  • codegenerator/cli/npm/envio/src/FetchState.res
  • codegenerator/cli/npm/envio/src/Throttler.res
  • codegenerator/cli/npm/envio/src/Utils.res
  • codegenerator/cli/templates/static/codegen/src/EventProcessing.res
  • scenarios/test_codegen/test/lib_tests/Persistence_test.res
  • codegenerator/cli/npm/envio/src/Persistence.res
  • codegenerator/cli/templates/static/codegen/src/IO.res
  • codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res
  • scenarios/test_codegen/test/helpers/Mock.res
  • codegenerator/cli/npm/envio/src/PgStorage.res
  • codegenerator/cli/templates/static/codegen/src/Index.res
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res
  • codegenerator/cli/npm/envio/src/Batch.res
  • codegenerator/cli/npm/envio/src/db/InternalTable.res
**/*.res

📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)

Prefer reading ReScript .res modules directly; compiled .js artifacts can be ignored

Files:

  • codegenerator/cli/templates/static/codegen/src/db/Db.res
  • codegenerator/cli/npm/envio/src/bindings/Postgres.res
  • codegenerator/cli/npm/envio/src/FetchState.res
  • codegenerator/cli/npm/envio/src/Throttler.res
  • codegenerator/cli/npm/envio/src/Utils.res
  • codegenerator/cli/templates/static/codegen/src/EventProcessing.res
  • scenarios/test_codegen/test/lib_tests/Persistence_test.res
  • codegenerator/cli/npm/envio/src/Persistence.res
  • codegenerator/cli/templates/static/codegen/src/IO.res
  • codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res
  • scenarios/test_codegen/test/helpers/Mock.res
  • codegenerator/cli/npm/envio/src/PgStorage.res
  • codegenerator/cli/templates/static/codegen/src/Index.res
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res
  • codegenerator/cli/npm/envio/src/Batch.res
  • codegenerator/cli/npm/envio/src/db/InternalTable.res
codegenerator/cli/templates/{static,dynamic}/codegen/src/**/*

📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)

Edit template versions under codegenerator/cli/templates/static/codegen/src or codegenerator/cli/templates/dynamic/codegen/src instead of editing generated/src

Files:

  • codegenerator/cli/templates/static/codegen/src/db/Db.res
  • codegenerator/cli/templates/static/codegen/src/EventProcessing.res
  • codegenerator/cli/templates/dynamic/codegen/src/TestHelpers_MockDb.res.hbs
  • codegenerator/cli/templates/static/codegen/src/IO.res
  • codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res
  • codegenerator/cli/templates/static/codegen/src/Index.res
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
  • codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build_and_test
🔇 Additional comments (34)
codegenerator/cli/npm/envio/src/bindings/Postgres.res (1)

81-81: Confirm debug callback signature parity
The 4-parameter signature aligns with postgres.js’s (connection, query, params, types)—verify this against the specific postgres version targeted by your generated bindings.

codegenerator/cli/npm/envio/src/Throttler.res (1)

33-36: LGTM: better error logging via prettified exn.

Passing exn->Utils.prettifyExn reduces noise and standardizes logged errors.

codegenerator/cli/npm/envio/src/FetchState.res (2)

1282-1287: Clamp logIndex before decrement
Change to return None when item.logIndex === 0 to avoid Some(-1), which downstream may treat as invalid.

 let getProgressNextBlockLogIndex = ({queue}: t) => {
-  switch queue->Utils.Array.last {
-  | Some(item) => Some(item.logIndex - 1)
-  | None => None
-  }
+  switch queue->Utils.Array.last {
+  | Some(item) => item.logIndex === 0 ? None : Some(item.logIndex - 1)
+  | None => None
+  }
 }

1275-1280: No action required – off-by-one confirmed
Verified that subtracting 1 from the last queued blockNumber correctly yields “last processed” for:

  • ChainFetcher restart (progressBlockNumber + 1 ⇒ first unprocessed block)
  • EventProcessing RPC (toBlockNumber ≤ last fully processed)
  • Database persistence/deletion logic (stores true last processed block)
  • Prometheus metrics (reports committed progress)
    All consumers expect this semantics.
codegenerator/cli/npm/envio/src/Persistence.res (2)

35-35: API rename to resumeInitialState looks good.

Type shape remains consistent; downstream call sites should now reference resumeInitialState.


144-145: Correctly switching Ready() hydration to resumeInitialState.

This aligns the init fast-path with the new API and avoids doing a second initialize().

scenarios/test_codegen/test/lib_tests/Persistence_test.res (6)

5-5: Mock surface updated to include #resumeInitialState — good.

Keeps the test aligned with the new storage API.


61-62: Assertion now checks resumeInitialStateCalls — good.

Matches the renamed API and preserves intent.


101-102: Second-phase assertion updated to resumeInitialStateCalls — good.

Keeps the regression coverage intact.


129-130: Minimal mock when skipping initialize — good.

Constraining to [#isInitialized, #resumeInitialState] captures the intended path.


158-160: Assertion for resumeInitialStateCalls length is correct.

Confirms resume path is executed exactly once.


141-147: Keep using resolveLoadInitialState; the mock helper doesn’t define resolveResumeInitialState. Tests correctly call storageMock.resolveLoadInitialState; only rename to resolveResumeInitialState after updating scenarios/test_codegen/test/helpers/Mock.res to expose that method.

Likely an incorrect or invalid review comment.

codegenerator/cli/templates/static/codegen/src/Index.res (1)

203-206: Switch to committedProgressBlockNumber mapped into latestProcessedBlock — OK.

The UI contract stays stable while adopting the new internal progress model.

Also applies to: 212-212

codegenerator/cli/npm/envio/src/PgStorage.res (2)

673-676: Initialization guard considers both old and new sentinel tables — OK.

Covers pre-envio@2.28 (“event_sync_state”) and new Chains table presence.


922-931: Public surface exports resumeInitialState — OK.

Matches Persistence.storage type and test/mocks.

scenarios/test_codegen/test/helpers/Mock.res (1)

39-45: LGTM: Public API switched to #resumeInitialState

Variant rename looks consistent with the new flow.

codegenerator/cli/templates/static/codegen/src/EventProcessing.res (1)

239-265: LGTM: Switched logging to progressedChains-derived byChain

Building byChain from progressedChains and logging it is a good fit for the new progress model.

codegenerator/cli/templates/static/codegen/src/IO.res (2)

51-58: LGTM: executeBatch signature updated to accept progressedChains

Signature change is clear and matches upstream usage.


238-249: LGTM: Write envio_chains progress within the same transaction

Updating InternalTable.Chains before other writes keeps progress atomic with batch persistence.

codegenerator/cli/templates/dynamic/codegen/src/TestHelpers_MockDb.res.hbs (1)

424-424: LGTM! Storage API rename aligns with PR objectives.

The rename from loadInitialState to resumeInitialState properly reflects the PR's new per-chain progress model. This change is consistently applied across the codebase.

codegenerator/cli/npm/envio/src/Batch.res (4)

1-7: Well-structured progress tracking type.

The progressedChain type properly captures the essential fields for tracking indexer progress per chain. The optional progressNextBlockLogIndex field correctly handles cases where partial block processing may occur.


21-29: Clean comparator extraction.

The function properly extracts event comparator components from the event item and delegates to the centralized EventUtils.getEventComparator for consistent ordering across the codebase.


71-97: Efficient ordered batch processing with proper side-effect management.

The function correctly implements ordered multi-chain event processing:

  1. Iterates until batch size is reached
  2. Pops items only when actual events exist (not NoItem)
  3. Updates per-chain statistics via dictionary mutations
  4. Uses tail recursion for efficiency

The side effects (array push and dictionary updates) are well-contained and necessary for performance.


31-41: No changes required for NoItem comparator The hardcoded logIndex of 0 is safe given that multi‐chain comparisons use the chainId tuple element and same‐chain ordering defers to FetchState.qItemLt, which already treats NoItem before any Item.

codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (3)

60-72: Consistent parameter naming improves API clarity.

The parameter rename from self to chainManager across all public functions improves consistency and readability.


88-180: Well-architected batch creation with comprehensive progress tracking.

The createBatch function effectively:

  1. Creates defensive copies of fetch states to avoid mutations
  2. Delegates to appropriate batching strategy (ordered vs unordered)
  3. Computes per-chain progress accurately
  4. Maintains proper separation of concerns between ChainManager and Batch modules

The progress tracking logic correctly handles both block advancement and within-block event counts.


192-205: Clean extraction of safe reorg information.

The function properly collects chain IDs and their corresponding safe block numbers for reorg handling.

codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res (2)

21-21: Good shift to an explicit, monotonic progress marker.

Using committedProgressBlockNumber (plumbed from progressBlockNumber) clarifies semantics and removes None/Some branching around “latest processed.” LGTM.

Also applies to: 157-158


442-446: Endblock completion now keyed off committedProgressBlockNumber.

This is the right source of truth post-refactor. LGTM.

codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res (2)

916-926: No-items progress-only path: DB update then dispatch is correct.

Persisting progressedChains when items is empty keeps progress durable. LGTM.


952-961: Per-chain Processing metrics wiring looks good.

Zeroing batch size when a chain is absent from progressedChains prevents stale gauge values. LGTM.

codegenerator/cli/npm/envio/src/db/InternalTable.res (3)

9-14: Schema refactor is coherent.

Introducing progressFields and folding them into Chains.field keeps DDL and API aligned. Field order in fields matches initialFromConfig and VALUES builder. LGTM.

Also applies to: 16-16, 28-41


53-61: Type t + initial values align with the new semantics.

  • progressBlockNumber = -1 sentinel and optional progressNextBlockLogIndex are sensible.
  • events_processed placed alongside progress_fields is clear. LGTM.

Also applies to: 89-101, 116-121


155-176: Meta-fields update query builder looks solid.

Static field list with positional params avoids stringly-typed mistakes. LGTM.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
codegenerator/cli/npm/envio/src/db/InternalTable.res (1)

458-493: Critical: -1 progress still purges whole chain; also tighten boundary and clarify precedence

As written, when progressBlockNumber = -1 the switch returns an empty suffix, yielding DELETE … WHERE chain_id = X; — a full-chain purge. Move the entire DELETE into the switch so the -1 branch emits nothing. Also:

  • Use parentheses around the OR/AND group for clarity.
  • Use >= for the log-index boundary if progressNextBlockLogIndex represents “next to process” to avoid retaining unprocessed rows.
-      query :=
-        query.contents ++
-        `DELETE FROM "${pgSchema}"."${table.tableName}"
-WHERE chain_id = ${chain.id->Belt.Int.toString}${switch chain {
-          | {progressBlockNumber: -1} => ``
-          | {progressBlockNumber, progressNextBlockLogIndex: Null} =>
-            ` AND registering_event_block_number > ${progressBlockNumber->Belt.Int.toString}`
-          | {progressBlockNumber, progressNextBlockLogIndex: Value(progressNextBlockLogIndex)} =>
-            ` AND (
-  registering_event_block_number > ${(progressBlockNumber + 1)->Belt.Int.toString}
-  OR registering_event_block_number = ${(progressBlockNumber + 1)->Belt.Int.toString}
-  AND registering_event_log_index > ${progressNextBlockLogIndex->Belt.Int.toString}
-)`
-          }};`
+      query :=
+        query.contents
+        ++ switch chain {
+        | {progressBlockNumber: -1} => `` /* No cleanup on fresh start */
+        | {progressBlockNumber, progressNextBlockLogIndex: Null} =>
+          `DELETE FROM "${pgSchema}"."${table.tableName}"
+WHERE chain_id = ${chain.id->Belt.Int.toString}
+  AND registering_event_block_number > ${progressBlockNumber->Belt.Int.toString};`
+        | {progressBlockNumber, progressNextBlockLogIndex: Value(progressNextBlockLogIndex)} =>
+          `DELETE FROM "${pgSchema}"."${table.tableName}"
+WHERE chain_id = ${chain.id->Belt.Int.toString}
+  AND (
+    registering_event_block_number > ${(progressBlockNumber + 1)->Belt.Int.toString}
+    OR (
+      registering_event_block_number = ${(progressBlockNumber + 1)->Belt.Int.toString}
+      AND registering_event_log_index >= ${progressNextBlockLogIndex->Belt.Int.toString}
+    )
+  );`
+        }`
@@
-      query :=
-        query.contents ++
-        `DELETE FROM "${pgSchema}"."${table.tableName}_history"
-WHERE entity_history_chain_id = ${chain.id->Belt.Int.toString}${switch chain {
-          | {progressBlockNumber: -1} => ``
-          | {progressBlockNumber, progressNextBlockLogIndex: Null} =>
-            ` AND entity_history_block_number > ${progressBlockNumber->Belt.Int.toString}`
-          | {progressBlockNumber, progressNextBlockLogIndex: Value(progressNextBlockLogIndex)} =>
-            ` AND (
-  entity_history_block_number > ${(progressBlockNumber + 1)->Belt.Int.toString}
-  OR entity_history_block_number = ${(progressBlockNumber + 1)->Belt.Int.toString}
-  AND entity_history_log_index > ${progressNextBlockLogIndex->Belt.Int.toString}
-)`
-          }};`
+      query :=
+        query.contents
+        ++ switch chain {
+        | {progressBlockNumber: -1} => `` /* No cleanup on fresh start */
+        | {progressBlockNumber, progressNextBlockLogIndex: Null} =>
+          `DELETE FROM "${pgSchema}"."${table.tableName}_history"
+WHERE entity_history_chain_id = ${chain.id->Belt.Int.toString}
+  AND entity_history_block_number > ${progressBlockNumber->Belt.Int.toString};`
+        | {progressBlockNumber, progressNextBlockLogIndex: Value(progressNextBlockLogIndex)} =>
+          `DELETE FROM "${pgSchema}"."${table.tableName}_history"
+WHERE entity_history_chain_id = ${chain.id->Belt.Int.toString}
+  AND (
+    entity_history_block_number > ${(progressBlockNumber + 1)->Belt.Int.toString}
+    OR (
+      entity_history_block_number = ${(progressBlockNumber + 1)->Belt.Int.toString}
+      AND entity_history_log_index >= ${progressNextBlockLogIndex->Belt.Int.toString}
+    )
+  );`
+        }`

If your semantics are “log index last processed” instead of “next to process,” keep > and drop the = in the proposed diff. Confirm which contract you want.

🧹 Nitpick comments (4)
codegenerator/cli/npm/envio/src/db/InternalTable.res (3)

155-176: Param order coupling: keep metaFields and builders in lockstep

makeMetaFieldsUpdateQuery assumes metaFields order. Add a brief comment warning future editors that reordering metaFields changes the UPDATE parameter order.

   let metaFields: array<field> = [
+    // WARNING: Order matters. makeMetaFieldsUpdateQuery and setMeta rely on this exact sequence.
     #source_block,
     #buffer_block,
     #first_event_block,

196-215: Use int for WHERE id param in setMeta to avoid implicit casts

Currently pushing chainId as a string. Push an int to match column type and avoid implicit text→int casts.

-      // Push id first (for WHERE clause)
-      params->Js.Array2.push(chainId->(Utils.magic: string => unknown))->ignore
+      // Push id first (for WHERE clause) — parse to int to match column type
+      switch Belt.Int.fromString(chainId) {
+      | Some(id) => params->Js.Array2.push(id->(Utils.magic: int => unknown))->ignore
+      | None => Js.Exn.raiseError("Invalid chain id in setMeta")
+      }

178-194: Progress updates path is clear; consider batching to reduce round-trips

Per-chain UPDATEs are fine for small N. If N grows, a single UPDATE … FROM (VALUES …) with CASE or a temp table would cut round-trips.

If you expect >100 chains per tick, I can provide a VALUES-based bulk UPDATE snippet.

Also applies to: 220-251

codegenerator/cli/templates/static/codegen/src/EventProcessing.res (1)

247-258: Type the byChain payload built from progressedChains (and optionally surface progress-only chains)

The payload shape is implicit. Add a small object type for compile-time safety. Optional: if you want logs to reflect chains that advanced with zero events, include them with a flag instead of filtering them out.

Apply within this hunk:

-  let byChain = Js.Dict.empty()
+  let byChain: Js.Dict.t<ProgressLog> = Js.Dict.empty()

Add this type (outside this hunk, e.g., near existing types):

type ProgressLog = {
  "batchSize": int,
  "toBlockNumber": int,
}

Optional alternative (to log progress-only chains):

-  progressedChains->Js.Array2.forEach(data => {
-    if data.batchSize > 0 {
-      byChain->Utils.Dict.setByInt(
-        data.chainId,
-        {
-          "batchSize": data.batchSize,
-          "toBlockNumber": data.progressBlockNumber,
-        },
-      )
-    }
-  })
+  progressedChains->Js.Array2.forEach(data => {
+    byChain->Utils.Dict.setByInt(
+      data.chainId,
+      {
+        "batchSize": data.batchSize,
+        "toBlockNumber": data.progressBlockNumber,
+      },
+    )
+  })

Note: The existing type logPartitionInfo above appears unused with this new payload; consider removing or repurposing it to avoid drift.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 313325f and 10ec0fe.

📒 Files selected for processing (2)
  • codegenerator/cli/npm/envio/src/db/InternalTable.res (9 hunks)
  • codegenerator/cli/templates/static/codegen/src/EventProcessing.res (2 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{res,resi}

📄 CodeRabbit inference engine (.cursor/rules/rescript.mdc)

**/*.{res,resi}: Never use [| item |] to create an array. Use [ item ] instead.
Must always use = for setting value to a field. Use := only for ref values created using ref function.
ReScript has record types which require a type definition before hand. You can access record fields by dot like foo.myField.
It's also possible to define an inline object, it'll have quoted fields in this case.
Use records when working with structured data, and objects to conveniently pass payload data between functions.
Never use %raw to access object fields if you know the type.

Files:

  • codegenerator/cli/templates/static/codegen/src/EventProcessing.res
  • codegenerator/cli/npm/envio/src/db/InternalTable.res
**/*.res

📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)

Prefer reading ReScript .res modules directly; compiled .js artifacts can be ignored

Files:

  • codegenerator/cli/templates/static/codegen/src/EventProcessing.res
  • codegenerator/cli/npm/envio/src/db/InternalTable.res
codegenerator/cli/templates/{static,dynamic}/codegen/src/**/*

📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)

Edit template versions under codegenerator/cli/templates/static/codegen/src or codegenerator/cli/templates/dynamic/codegen/src instead of editing generated/src

Files:

  • codegenerator/cli/templates/static/codegen/src/EventProcessing.res
🔇 Additional comments (8)
codegenerator/cli/npm/envio/src/db/InternalTable.res (5)

9-14: Good factoring: dedicated progressFields union

Splitting progress fields into their own union keeps types clean and reduces drift across query builders.

Also applies to: 16-16


35-41: Schema additions look consistent; confirm migration strategy for non-null columns

events_processed, progress_block, and _progress_log_index are wired into both the field list and table definition. Since progress_block and events_processed are non-nullable, ensure your migration adds them with sane defaults/backfill on existing installations to avoid ALTER TABLE failures.

I can draft a reversible SQL migration if helpful.

Also applies to: 89-104


43-51: metaFields shape aligns with view/use sites

Block/source/buffer/ready fields grouped as meta is clear.


53-61: Chain record defaults are sensible

Initializing progressBlockNumber=-1 and _progress_log_index=NULL matches the “no progress yet” contract.

Also applies to: 116-118


364-370: Views: progress mapped correctly

Exposing progress_block in _meta and as latest_processed_block in chain_metadata is consistent.

Also applies to: 384-385

codegenerator/cli/templates/static/codegen/src/EventProcessing.res (3)

259-265: LGTM: logger payload now derives from progressedChains

Using byChain assembled from progressedChains makes the trace precise per chain. Looks good.


286-295: Please verify that IO.executeBatch wraps both entity writes and progressedChains updates in a single database transaction
The code generator’s executeBatch calls Db.sql->IO.executeBatch, but a cursory grep in IO.res shows no explicit BEGIN/COMMIT or sql.begin around both the entity inserts and the envio_chains progress update. You must confirm (e.g. by inspecting IO.res for the transaction boundary in executeBatch) that IO.executeBatch uses one transaction for:

  • flushing all batched entities
  • updating envio_chains progress

If it doesn’t, wrap both operations in the same transaction to prevent partial progress on failures.


238-246: All call sites updated and zero‐event chains still reach DB The only active invocation in GlobalState.res correctly passes ~progressedChains, and the full array (including zero‐batch entries) is forwarded to executeBatch for DB updates; logging filters out zeros as intended.

@DZakh DZakh requested a review from JonoPrest September 3, 2025 13:17
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
codegenerator/cli/npm/envio/src/db/InternalTable.res (1)

458-495: Skip full-table DELETE when progressBlockNumber = -1.

The current implementation doesn't skip DELETE operations when progressBlockNumber = -1, which could lead to unintended full-table purges on fresh deployments or after migrations.

Apply this diff to fix the issue:

 let makeCleanUpOnRestartQuery = (~pgSchema, ~chains: array<Chains.t>) => {
   let query = ref(``)
 
   chains->Js.Array2.forEach(chain => {
     if query.contents !== "" {
       query := query.contents ++ "\n"
     }
-    query :=
-      query.contents ++
-      `DELETE FROM "${pgSchema}"."${table.tableName}"
-WHERE chain_id = ${chain.id->Belt.Int.toString}${switch chain {
-        | {progressBlockNumber, progressNextBlockLogIndex: Value(progressNextBlockLogIndex)} =>
-          ` AND (
-  registering_event_block_number > ${(progressBlockNumber + 1)->Belt.Int.toString}
-  OR registering_event_block_number = ${(progressBlockNumber + 1)->Belt.Int.toString}
-  AND registering_event_log_index > ${progressNextBlockLogIndex->Belt.Int.toString}
-)`
-        | {progressBlockNumber: -1, progressNextBlockLogIndex: Null} => ``
-        | {progressBlockNumber, progressNextBlockLogIndex: Null} =>
-          ` AND registering_event_block_number > ${progressBlockNumber->Belt.Int.toString}`
-        }};
-DELETE FROM "${pgSchema}"."${table.tableName}_history"
-WHERE entity_history_chain_id = ${chain.id->Belt.Int.toString}${switch chain {
-        | {progressBlockNumber, progressNextBlockLogIndex: Value(progressNextBlockLogIndex)} =>
-          ` AND (
-  entity_history_block_number > ${(progressBlockNumber + 1)->Belt.Int.toString}
-  OR entity_history_block_number = ${(progressBlockNumber + 1)->Belt.Int.toString}
-  AND entity_history_log_index > ${progressNextBlockLogIndex->Belt.Int.toString}
-)`
-        | {progressBlockNumber: -1, progressNextBlockLogIndex: Null} => ``
-        | {progressBlockNumber, progressNextBlockLogIndex: Null} =>
-          ` AND entity_history_block_number > ${progressBlockNumber->Belt.Int.toString}`
-        }};`
+    query :=
+      query.contents
+      ++ switch chain {
+      | {progressBlockNumber: -1, progressNextBlockLogIndex: Null} => ``
+      | {progressBlockNumber, progressNextBlockLogIndex: Value(progressNextBlockLogIndex)} =>
+        `DELETE FROM "${pgSchema}"."${table.tableName}"
+WHERE chain_id = ${chain.id->Belt.Int.toString}
+  AND (
+    registering_event_block_number > ${(progressBlockNumber + 1)->Belt.Int.toString}
+    OR (
+      registering_event_block_number = ${(progressBlockNumber + 1)->Belt.Int.toString}
+      AND registering_event_log_index > ${progressNextBlockLogIndex->Belt.Int.toString}
+    )
+  );
+DELETE FROM "${pgSchema}"."${table.tableName}_history"
+WHERE entity_history_chain_id = ${chain.id->Belt.Int.toString}
+  AND (
+    entity_history_block_number > ${(progressBlockNumber + 1)->Belt.Int.toString}
+    OR (
+      entity_history_block_number = ${(progressBlockNumber + 1)->Belt.Int.toString}
+      AND entity_history_log_index > ${progressNextBlockLogIndex->Belt.Int.toString}
+    )
+  );`
+      | {progressBlockNumber, progressNextBlockLogIndex: Null} =>
+        `DELETE FROM "${pgSchema}"."${table.tableName}"
+WHERE chain_id = ${chain.id->Belt.Int.toString}
+  AND registering_event_block_number > ${progressBlockNumber->Belt.Int.toString};
+DELETE FROM "${pgSchema}"."${table.tableName}_history"
+WHERE entity_history_chain_id = ${chain.id->Belt.Int.toString}
+  AND entity_history_block_number > ${progressBlockNumber->Belt.Int.toString};`
+      }
   })
 
   query.contents
 }
codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res (1)

221-241: Guard against regressive progress updates.

The current implementation could apply regressive updates if a chain's progress data has lower values than already committed. This could happen with out-of-order batch completions.

Apply this guard:

 let cf = switch maybeProgressData {
 | Some(progressData) => {
+    if (
+      progressData.progressBlockNumber < cf.committedProgressBlockNumber ||
+      progressData.totalEventsProcessed < cf.numEventsProcessed
+    ) {
+      cf
+    } else {
       if cf.committedProgressBlockNumber !== progressData.progressBlockNumber {
         Prometheus.ProgressBlockNumber.set(
           ~blockNumber=progressData.progressBlockNumber,
           ~chainId=chain->ChainMap.Chain.toChainId,
         )
       }
       if cf.numEventsProcessed !== progressData.totalEventsProcessed {
         Prometheus.ProgressEventsCount.set(
           ~processedCount=progressData.totalEventsProcessed,
           ~chainId=chain->ChainMap.Chain.toChainId,
         )
       }
       {
         ...cf,
         committedProgressBlockNumber: progressData.progressBlockNumber,
         numEventsProcessed: progressData.totalEventsProcessed,
       }
+    }
   }
 | None => cf
 }
🧹 Nitpick comments (6)
scenarios/test_codegen/test/rollback/Rollback_test.res (1)

329-336: Use consistent variant namespacing in assertions
Replace the qualified GlobalState.UpdateEndOfBlockRangeScannedData at lines 334 and 373 with the unqualified UpdateEndOfBlockRangeScannedData to match earlier assertions.

scenarios/test_codegen/test/ChainManager_test.res (1)

95-101: Mock updated to progress model — sentinel clarity

committedProgressBlockNumber: -1 matches the new non-optional field. Consider a named constant (e.g., PROGRESS_NOT_STARTED = -1) to make intent explicit in tests.

scenarios/test_codegen/test/lib_tests/PgStorage_test.res (3)

147-170: Schema/view changes for progress fields look consistent

  • progress_block (NOT NULL) and _progress_log_index added to envio_chains.
  • _meta exposes progressBlock and sourceBlock; chain_metadata.latest_processed_block now derives from progress_block.
    Seed inserts updated accordingly.

Consider adding lightweight constraints for data hygiene:

  • CHECK (progress_block >= -1)
  • CHECK (_progress_log_index IS NULL OR _progress_log_index >= 0)

Also applies to: 187-190


582-584: Initial insert values include progress defaults

Initial progress_block=-1 and _progress_log_index=NULL align with “not started” semantics; consistent across single, no-endBlock, and multi-chain cases.

Document these sentinel defaults in code comments near the generator to help future maintainers.

Also applies to: 609-611, 645-648


659-767: Restart cleanup SQL is precise across progress states

The four-case DELETEs correctly compute boundaries:

  • (-1, NULL): wipe all for chain.
  • (-1, 0): keep (0,0) and earlier.
  • (10, NULL): remove > 10.
  • (10, 5): remove > 11 or (=11 and log_index > 5).
    Parentheses guard precedence; history tables mirrored.

If not already present, an index on (chain_id, registering_event_block_number, registering_event_log_index) (and matching history fields) will make these cleanup queries scale.

codegenerator/cli/npm/envio/src/db/InternalTable.res (1)

196-218: Consider adding error handling for database operations.

The database update operations in setMeta could fail. Consider wrapping them in try-catch or returning a Result type to handle potential database errors gracefully.

 let setMeta = (sql, ~pgSchema, ~chainsData: dict<metaFields>) => {
   let query = makeMetaFieldsUpdateQuery(~pgSchema)
 
   let promises = []
 
   chainsData->Utils.Dict.forEachWithKey((chainId, data) => {
     let params = []
 
     // Push id first (for WHERE clause)
     params->Js.Array2.push(chainId->(Utils.magic: string => unknown))->ignore
 
     // Then push all updateable field values (for SET clause)
     metaFields->Js.Array2.forEach(field => {
       let value =
         data->(Utils.magic: metaFields => dict<unknown>)->Js.Dict.unsafeGet((field :> string))
       params->Js.Array2.push(value)->ignore
     })
 
-    promises->Js.Array2.push(sql->Postgres.preparedUnsafe(query, params->Obj.magic))->ignore
+    promises->Js.Array2.push(
+      sql->Postgres.preparedUnsafe(query, params->Obj.magic)
+      ->Promise.catch(err => {
+        Logging.error({
+          "msg": "Failed to update chain metadata",
+          "chainId": chainId,
+          "error": err
+        })
+        Promise.reject(err)
+      })
+    )->ignore
   })
 
   Promise.all(promises)
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 10ec0fe and c49df32.

📒 Files selected for processing (9)
  • codegenerator/cli/npm/envio/src/FetchState.res (1 hunks)
  • codegenerator/cli/npm/envio/src/db/InternalTable.res (9 hunks)
  • codegenerator/cli/templates/static/blank_template/shared/.gitignore (0 hunks)
  • codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res (8 hunks)
  • scenarios/erc20_multichain_factory/test/DynamicContractRecovery_test.res (3 hunks)
  • scenarios/test_codegen/test/ChainManager_test.res (3 hunks)
  • scenarios/test_codegen/test/lib_tests/FetchState_test.res (1 hunks)
  • scenarios/test_codegen/test/lib_tests/PgStorage_test.res (15 hunks)
  • scenarios/test_codegen/test/rollback/Rollback_test.res (1 hunks)
💤 Files with no reviewable changes (1)
  • codegenerator/cli/templates/static/blank_template/shared/.gitignore
🚧 Files skipped from review as they are similar to previous changes (1)
  • codegenerator/cli/npm/envio/src/FetchState.res
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{res,resi}

📄 CodeRabbit inference engine (.cursor/rules/rescript.mdc)

**/*.{res,resi}: Never use [| item |] to create an array. Use [ item ] instead.
Must always use = for setting value to a field. Use := only for ref values created using ref function.
ReScript has record types which require a type definition before hand. You can access record fields by dot like foo.myField.
It's also possible to define an inline object, it'll have quoted fields in this case.
Use records when working with structured data, and objects to conveniently pass payload data between functions.
Never use %raw to access object fields if you know the type.

Files:

  • scenarios/erc20_multichain_factory/test/DynamicContractRecovery_test.res
  • scenarios/test_codegen/test/rollback/Rollback_test.res
  • scenarios/test_codegen/test/lib_tests/PgStorage_test.res
  • scenarios/test_codegen/test/lib_tests/FetchState_test.res
  • scenarios/test_codegen/test/ChainManager_test.res
  • codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res
  • codegenerator/cli/npm/envio/src/db/InternalTable.res
**/*.res

📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)

Prefer reading ReScript .res modules directly; compiled .js artifacts can be ignored

Files:

  • scenarios/erc20_multichain_factory/test/DynamicContractRecovery_test.res
  • scenarios/test_codegen/test/rollback/Rollback_test.res
  • scenarios/test_codegen/test/lib_tests/PgStorage_test.res
  • scenarios/test_codegen/test/lib_tests/FetchState_test.res
  • scenarios/test_codegen/test/ChainManager_test.res
  • codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res
  • codegenerator/cli/npm/envio/src/db/InternalTable.res
codegenerator/cli/templates/{static,dynamic}/codegen/src/**/*

📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)

Edit template versions under codegenerator/cli/templates/static/codegen/src or codegenerator/cli/templates/dynamic/codegen/src instead of editing generated/src

Files:

  • codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build_and_test
🔇 Additional comments (17)
scenarios/test_codegen/test/ChainManager_test.res (2)

172-179: Comparator moved: assertion reads correctly

Switch to Batch.getComparitorFromItem looks correct and keeps ordering semantics intact.


226-231: Public API relocation acknowledged

Using Batch.getOrderedNextItem instead of ChainManager.getOrderedNextItem is aligned with the refactor.

scenarios/test_codegen/test/lib_tests/FetchState_test.res (1)

3051-3129: Fix one assertion message (no duplicate suites found)

Confirmed no duplicate describe("FetchState progress tracking") suite exists. Apply this nit:

@@
   it("When queue has a single item with non 0 log index", () => {
@@
     Assert.equal(
       fetchStateSingleItem->FetchState.getProgressNextBlockLogIndex,
-      Some(4),
-      ~message="Should return None when queue has a single item",
+      Some(4),
+      ~message="Should return Some(logIndex - 1) when queue has a single non-zero logIndex item",
     )
scenarios/test_codegen/test/lib_tests/PgStorage_test.res (3)

232-233: Minimal-config DDL/view updates mirror main case

The minimal path reflects the same progress_block/source_block changes; good.

Also applies to: 241-247, 258-259


528-538: Progress-fields update path is correct

makeProgressFieldsUpdateQuery updates exactly progress_block, _progress_log_index, and events_processed. Clear separation; safer for partial writes.


507-517: Verify parameter binding order in callers
makeMetaFieldsUpdateQuery emits placeholders in this exact sequence:

  1. id ($1)
  2. source_block ($2)
  3. buffer_block ($3)
  4. first_event_block ($4)
  5. ready_at ($5)
  6. _is_hyper_sync ($6)
  7. _num_batches_fetched ($7)

I didn’t find any direct execute calls passing this query—please manually confirm that all callers (e.g. the setMeta function in codegenerator/cli/npm/envio/src/db/InternalTable.res) bind arguments in the above order to avoid swapped values.

codegenerator/cli/npm/envio/src/db/InternalTable.res (3)

9-14: LGTM!

The introduction of progressFields as a separate type provides clear separation between progress tracking and other chain fields.


53-61: LGTM! Clean restructuring of chain types.

The type composition using spread operator for metaFields makes the code more maintainable and clearly separates metadata from progress tracking fields.


116-117: LGTM!

Properly initialized progress tracking fields with -1 indicating no progress has been made yet.

scenarios/erc20_multichain_factory/test/DynamicContractRecovery_test.res (3)

142-163: LGTM! Good test isolation pattern.

Creating a test-specific persistence with transaction-scoped SQL client ensures proper rollback capability and test isolation.


269-272: LGTM! Correctly updated to use new progress tracking fields.

The SQL update properly targets the new progress_block and _progress_log_index fields using the InternalTable field constants.


304-304: Good practice with error prettification.

Using prettifyExn ensures that error messages are more readable in test output.

codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res (5)

133-133: LGTM! Clean transition to progress-based tracking.

The EventBatchProcessed action now correctly uses progressedChains for tracking batch processing progress.


174-191: LGTM! Proper migration to metadata fields structure.

The updateChainMetadataTable function correctly builds chainsData as a dict of metaFields and uses the new setMeta API.


610-646: LGTM! Clean integration of progress tracking.

The EventBatchProcessed handler correctly updates chain manager with progressed chains and determines exit conditions.


915-925: LGTM! Good handling of events-free progress.

Correctly handles the case where chains progress without processing any events, updating only the progress metadata.


951-961: LGTM! Clean metrics derivation from progressedChains.

Prometheus metrics are properly updated based on progressedChains data, with appropriate fallback to zero for chains without progress.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
codegenerator/cli/npm/envio/src/Persistence.res (1)

145-159: Unify checkpoint shape and remove unsafe Utils.magic.

Current code logs mixed value types in checkpoints (object when Value(_), bare number when Null) and relies on Utils.magic. Prefer a consistent object with nullable logIndex to simplify downstream parsing and avoid unsafe casts.

Apply:

-          initialState.chains->Js.Array2.forEach(c => {
-            let checkpoint = switch c.progressNextBlockLogIndex {
-            | Value(
-                logIndex,
-              ) => // Latest processed log index (not necessarily processed by the indexer)
-              {
-                "blockNumber": c.progressBlockNumber + 1,
-                "logIndex": logIndex,
-              }
-            | Null =>
-              // Or simply the latest processed block number (might be -1 if not set)
-              c.progressBlockNumber->Utils.magic
-            }
-            checkpoints->Utils.Dict.setByInt(c.id, checkpoint)
-          })
+          initialState.chains->Js.Array2.forEach(c => {
+            let hasNextLog =
+              switch c.progressNextBlockLogIndex { | Value(_) => true | Null => false }
+            let checkpoint = {
+              "blockNumber":
+                hasNextLog ? c.progressBlockNumber + 1 : c.progressBlockNumber,
+              "logIndex":
+                switch c.progressNextBlockLogIndex {
+                | Value(logIndex) => Js.Nullable.return(logIndex)
+                | Null => Js.Nullable.null
+                },
+            }
+            checkpoints->Utils.Dict.setByInt(c.id, checkpoint)
+          })

Optionally, derive via the shared helpers (keeps semantics centralized):

let bn = FetchState.getProgressBlockNumber(c)
let liOpt = FetchState.getProgressNextBlockLogIndex(c)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between c49df32 and 8b9a978.

📒 Files selected for processing (1)
  • codegenerator/cli/npm/envio/src/Persistence.res (3 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.{res,resi}

📄 CodeRabbit inference engine (.cursor/rules/rescript.mdc)

**/*.{res,resi}: Never use [| item |] to create an array. Use [ item ] instead.
Must always use = for setting value to a field. Use := only for ref values created using ref function.
ReScript has record types which require a type definition before hand. You can access record fields by dot like foo.myField.
It's also possible to define an inline object, it'll have quoted fields in this case.
Use records when working with structured data, and objects to conveniently pass payload data between functions.
Never use %raw to access object fields if you know the type.

Files:

  • codegenerator/cli/npm/envio/src/Persistence.res
**/*.res

📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)

Prefer reading ReScript .res modules directly; compiled .js artifacts can be ignored

Files:

  • codegenerator/cli/npm/envio/src/Persistence.res
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: aarch64-unknown-linux-gnu (ubuntu-22.04)
  • GitHub Check: aarch64-apple-darwin (macos-latest)
  • GitHub Check: x86_64-apple-darwin (macos-latest)
  • GitHub Check: x86_64-unknown-linux-musl (ubuntu-22.04)
🔇 Additional comments (3)
codegenerator/cli/npm/envio/src/Persistence.res (3)

131-131: Message tweak LGTM.

Clearer signal when initialization completes.


146-153: Confirm off-by-one semantics for block+log checkpoint.

You set blockNumber = progressBlockNumber + 1 when progressNextBlockLogIndex is present, and keep logIndex as-is. Please confirm whether progressNextBlockLogIndex represents:

  • the next log index to process (in which case logIndex is already “next”), or
  • the last processed log index (in which case you likely want logIndex + 1).

Align the comment and the fields accordingly, or switch to the centralized helpers to avoid drift.


35-35: Storage interface update verified

All storage backends implement resumeInitialState and no calls to loadInitialState remain; consider adding @raises("StorageError") to resumeInitialState for consistency.

Comment on lines +1 to +7
type progressedChain = {
chainId: int,
batchSize: int,
progressBlockNumber: int,
progressNextBlockLogIndex: option<int>,
totalEventsProcessed: int,
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just annotate this

type t = {
items: array<Internal.eventItem>,
progressedChains: array<progressedChain>,
fetchStates: ChainMap.t<FetchState.t>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to updatedFetchStates

Copy link
Collaborator

@JonoPrest JonoPrest left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥇

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants