- 
                Notifications
    You must be signed in to change notification settings 
- Fork 28
Always include all block events to a batch #753
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| WalkthroughRefactors batching and fetch-state internals: queue → buffer model, new readiness helpers, per-chain batch sizing, and target-based batch preparation. Removes progress log index from schema and resume paths. Introduces config-driven batchSize, propagates through ChainManager/GlobalState/ChainFetcher, updates Batch API/shape, utilities, and tests accordingly. Changes
 Sequence Diagram(s)sequenceDiagram
  autonumber
  participant App as GlobalState
  participant CM as ChainManager
  participant CF as ChainFetcher (per chain)
  participant B as Batch
  participant FS as FetchState (buffer)
  App->>CM: createBatch(~batchSizeTarget=config.batchSize)
  CM->>CF: getFetchStates()
  CF-->>CM: ChainMap<FetchState>
  CM->>B: prepareOrderedBatch/prepareUnorderedBatch(~batchSizeTarget, ~fetchStates, ~mutBatchSizePerChain)
  B->>FS: hasOrderedReadyItem/hasUnorderedReadyItem
  B->>FS: getTimestampAt / getReadyItemsCount
  FS-->>B: readiness and counts
  B-->>CM: {items, progressedChains, updatedFetchStates, dcsToStoreByChainId, creationTimeMs}
  CM-->>App: Batch result
  App->>App: dispatch UpdateQueues(updatedFetchStates)
  App->>Metrics: set ProcessingMaxBatchSize=config.batchSize
sequenceDiagram
  autonumber
  participant DB as InternalTable.Chains
  participant Pers as Persistence
  note over DB,Pers: Progress tracking (schema/resume)
  DB-x DB: Remove _progress_log_index field
  Pers->>Pers: checkpoints := progressBlockNumber
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
 Suggested reviewers
 Poem
 Pre-merge checks and finishing touches✅ Passed checks (3 passed)
 ✨ Finishing touches🧪 Generate unit tests
 📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (2)
 🧰 Additional context used📓 Path-based instructions (3)codegenerator/cli/templates/{static,dynamic}/codegen/src/**/*📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc) 
 Files: 
 **/*.{res,resi}📄 CodeRabbit inference engine (.cursor/rules/rescript.mdc) 
 Files: 
 **/*.res📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc) 
 Files: 
 ⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
 🔇 Additional comments (2)
 Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🧪 Early access (Sonnet 4.5): enabledWe are currently testing the Sonnet 4.5 model, which is expected to improve code review quality. However, this model may lead to increased noise levels in the review comments. Please disable the early access features if the noise level causes any inconvenience. Note: 
 Comment  | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️  Outside diff range comments (6)
scenarios/test_codegen/test/ChainManager_test.res (1)
157-157: Outdated arg name: createBatch now expects ~batchSizeTarget
ChainManager.createBatchwas migrated to~batchSizeTarget. Update this call site, otherwise the test won’t compile.Apply:
- let eventsInBlock = ChainManager.createBatch(chainManager, ~maxBatchSize=10000) + let eventsInBlock = ChainManager.createBatch(chainManager, ~batchSizeTarget=10000)scenarios/test_codegen/test/lib_tests/FetchState_test.res (1)
2385-2388: Rename arg: filterAndSortForUnorderedBatch now takes ~batchSizeTargetTests still call
~maxBatchSize, which will fail after the API change.Apply:
- let prepared = FetchState.filterAndSortForUnorderedBatch( - [fsLate, fsExcluded, fsEarly], - ~maxBatchSize=3, - ) + let prepared = FetchState.filterAndSortForUnorderedBatch( + [fsLate, fsExcluded, fsEarly], + ~batchSizeTarget=3, + )- let prepared = FetchState.filterAndSortForUnorderedBatch( - [fsHalfEarlier, fsFullLater], - ~maxBatchSize=2, - ) + let prepared = FetchState.filterAndSortForUnorderedBatch( + [fsHalfEarlier, fsFullLater], + ~batchSizeTarget=2, + )- let prepared = FetchState.filterAndSortForUnorderedBatch( - [fsHalfEarlier, fsExactFull], - ~maxBatchSize=2, - ) + let prepared = FetchState.filterAndSortForUnorderedBatch( + [fsHalfEarlier, fsExactFull], + ~batchSizeTarget=2, + )Also applies to: 2429-2433, 2472-2475
codegenerator/cli/npm/envio/src/FetchState.res (1)
1305-1327: Replace all remaining~maxBatchSizeinvocations with~batchSizeTarget
- scenarios/test_codegen/test/ChainManager_test.res (e.g. line 157) and scenarios/test_codegen/test/lib_tests/FetchState_test.res (lines 2387, 2431, 2474)
- codegenerator/cli/npm/envio/src/Prometheus.res (line 552)
- codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (line 53)
- internal_docs/EventFetchers.md signature (if kept in sync)
codegenerator/cli/npm/envio/src/Batch.res (2)
74-96: Batch cap truncates a block; overflow to finish the current blockWith a strict items.length < batchSizeTarget guard, the final block can be split across batches, contradicting “Always include all block events to a batch.” Allow controlled overflow to drain the current block (same chainId + blockNumber) after hitting the target.
Apply:
let popOrderedBatchItems = ( - ~batchSizeTarget, + ~batchSizeTarget, ~fetchStates: ChainMap.t<FetchState.t>, ~sizePerChain: dict<int>, ) => { let items = [] - let rec loop = () => - if items->Array.length < batchSizeTarget { - switch fetchStates->getOrderedNextItem { - | Some({earliestEvent, chain}) => - switch earliestEvent { - | NoItem(_) => () - | Item({item, popItemOffQueue}) => { - popItemOffQueue() - items->Js.Array2.push(item)->ignore - sizePerChain->Utils.Dict.incrementByInt(chain->ChainMap.Chain.toChainId) - loop() - } - } - | _ => () - } - } - loop() + let rec loop = (lastKey: option<(int, int)>) => { + switch fetchStates->getOrderedNextItem { + | Some({earliestEvent, chain}) => + switch earliestEvent { + | NoItem(_) => () + | Item({item, popItemOffQueue}) => { + // Derive block key from comparator: (ts, chainId, blockNumber, logIndex) + let (_, chainId, blockNumber, _) = + earliestEvent->getQueueItemComparitor(~chain) + let curr = items->Array.length + let canTake = + curr < batchSizeTarget || + switch lastKey { + | Some((lastChainId, lastBlock)) => lastChainId === chainId && lastBlock === blockNumber + | None => true + } + if canTake { + popItemOffQueue() + items->Js.Array2.push(item)->ignore + sizePerChain->Utils.Dict.incrementByInt(chainId) + loop(Some((chainId, blockNumber))) + } else { + () + } + } + } + | _ => () + } + } + loop(None) items }
101-149: Unordered path can still split a block at the boundarySame issue as ordered: once batchSizeTarget is reached, inner per‑chain loop stops mid‑block. To satisfy the PR goal, continue draining the current block for that chain even if it overflows the target. Suggest extracting a helper that returns a (chainId, blockNumber) key from a queueItem, then gate the inner loop with “batchSize < target OR sameBlockAsLastPoppedForThisChain”.
I can draft a small helper signature and refactor if you confirm where to obtain blockNumber from queueItem in this module (EventUtils or FetchState).
codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (1)
115-131: Rename remaining call sites from ~maxBatchSize → ~batchSizeTargetOne test and the docs still use the old param name; update them (GlobalState already uses ~batchSizeTarget at codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res:902).
- scenarios/test_codegen/test/ChainManager_test.res:157 — change ChainManager.createBatch(chainManager, ~maxBatchSize=10000) to ChainManager.createBatch(chainManager, ~batchSizeTarget=10000).
- internal_docs/EventFetchers.md:15 — update the createBatch signature/docs (remove or rename min/maxBatchSize to reflect the new single ~batchSizeTarget param).
🧹 Nitpick comments (12)
codegenerator/cli/npm/envio/src/Utils.res (1)
191-194: Use of %raw to mutate array length — acceptable workaround, but document intent and risks.Directly setting arr.length via %raw bypasses type checks and our guideline to avoid %raw when the type is known. Given the spliceInPlace ignore-return bug noted in this file, this is pragmatic for tests; please add a brief ocaml.doc comment referencing the compiler issue and clarifying this is intentional until the bug is resolved. Consider renaming to clearInPlaceUnsafe to signal the escape hatch nature.
scenarios/test_codegen/test/helpers/Mock.res (2)
505-528: Wrapper around optional handler: small helper would reduce duplication and clarify preload behavior.The inline switch creates an Internal.handler or None and guards on context.isPreload. Extract to a tiny helper to keep tests concise and make preload behavior explicit.
- handler: switch item.handler { - | Some(handler) => - ( - ({context} as args) => { - // We don't want preload optimization for the tests - if context.isPreload { - Promise.resolve() - } else { - handler(args) - } - } - )->( - Utils.magic: Types.HandlerTypes.loader<unit, unit> => option< - Internal.handler, - > - ) - | None => None - }, + handler: ( + switch item.handler { + | Some(h) => + (({context} as args) => + context.isPreload ? Promise.resolve() : h(args) + ) + ->(Utils.magic: Types.HandlerTypes.loader<unit, unit> => option<Internal.handler>) + | None => None + } + ),
524-528: contractRegister cast: annotate intent.The magic cast from option<contractRegister> to option<Internal.contractRegister> is standard for these tests; add a short comment that this is a test-only conversion to avoid %raw/object poking.
scenarios/test_codegen/test/rollback/Rollback_test.res (2)
947-949: Clearing recorded calls mid-test: good use of clearInPlace; add a tiny helper to avoid repetition.You clear getItemsOrThrowCalls in-place to isolate the next phase. Consider a small local helper (e.g., let resetCalls = arr => arr->Utils.Array.clearInPlace) reused across tests to keep intent obvious.
960-970: Multiple microtask flushes: consider a helper.Repeated await Utils.delay(0) calls are a bit noisy. A local flushMicrotasks helper that awaits Promise.resolve() twice keeps tests terse and intention-revealing.
scenarios/test_codegen/test/ChainManager_test.res (3)
195-196: Fix test assertion strength: advance lastEvent across batchesThe recursive call reuses the initial
lastEvent, so the ordering assertion doesn’t evolve. Use the last item of the current batch.Apply:
- testThatCreatedEventsAreOrderedCorrectly(nextChainManager, lastEvent) + let newLastEvent = items->Utils.Array.last->Option.getExn + testThatCreatedEventsAreOrderedCorrectly(nextChainManager, newLastEvent)
51-53: Avoid potential infinite loop in random time generator
blockTimecan be 0, risking non‑progress in thewhileloop. Force a minimum of 1.Apply:
- let blockTime = getRandomInt(0, 2 * averageBlockTime) + let blockTime = getRandomInt(1, 2 * averageBlockTime)
2119-2119: Remove stray console log in tests
Js.log(updatedFetchState)adds noise to CI output.Apply:
- Js.log(updatedFetchState) + /* removed noisy log */codegenerator/cli/templates/static/codegen/src/Config.res (1)
130-131: Guard batchSize against misconfiguration (0 or negative)A 0
batchSizecan cause downstream indexing/array access issues. Clamp to ≥1.Apply:
) => { - let chainMap = + let batchSize = Pervasives.max(batchSize, 1) + let chainMap =scenarios/test_codegen/test/lib_tests/FetchState_test.res (1)
2119-2119: Remove stray console logDrop
Js.log(updatedFetchState)to keep tests quiet.Apply:
- Js.log(updatedFetchState) + /* removed noisy log */codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res (1)
910-922: Minor: compute threshold only when updates existYou compute
shouldEnterReorgThresholdunconditionally; optional micro‑opt is to only evaluate when any fetch state actually changed. Not blocking.codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (1)
9-16: Target buffer formula: double‑check intent under many chainsThe formula drops to 1×batchSize if activeChainsCount > 3. Validate this is intentional (memory cap) and won’t starve prefetch when e.g. 10 chains are active.
If the intent is “min(3, activeChainsCount) batches worth of buffer”, consider:
- config.batchSize * (activeChainsCount > targetBatchesInBuffer ? 1 : targetBatchesInBuffer) + config.batchSize * Belt.Int.min(activeChainsCount, targetBatchesInBuffer)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (16)
- codegenerator/cli/npm/envio/src/Batch.res(5 hunks)
- codegenerator/cli/npm/envio/src/FetchState.res(5 hunks)
- codegenerator/cli/npm/envio/src/PgStorage.res(0 hunks)
- codegenerator/cli/npm/envio/src/Utils.res(1 hunks)
- codegenerator/cli/npm/envio/src/db/InternalTable.res(0 hunks)
- codegenerator/cli/templates/static/codegen/src/Config.res(3 hunks)
- codegenerator/cli/templates/static/codegen/src/Env.res(1 hunks)
- codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res(6 hunks)
- codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res(2 hunks)
- scenarios/erc20_multichain_factory/test/DynamicContractRecovery_test.res(0 hunks)
- scenarios/test_codegen/test/ChainManager_test.res(3 hunks)
- scenarios/test_codegen/test/helpers/Mock.res(2 hunks)
- scenarios/test_codegen/test/lib_tests/FetchState_test.res(14 hunks)
- scenarios/test_codegen/test/lib_tests/PgStorage_test.res(0 hunks)
- scenarios/test_codegen/test/lib_tests/SourceManager_test.res(1 hunks)
- scenarios/test_codegen/test/rollback/Rollback_test.res(1 hunks)
💤 Files with no reviewable changes (4)
- codegenerator/cli/npm/envio/src/db/InternalTable.res
- scenarios/test_codegen/test/lib_tests/PgStorage_test.res
- codegenerator/cli/npm/envio/src/PgStorage.res
- scenarios/erc20_multichain_factory/test/DynamicContractRecovery_test.res
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{res,resi}
📄 CodeRabbit inference engine (.cursor/rules/rescript.mdc)
**/*.{res,resi}: Never use[| item |]to create an array. Use[ item ]instead.
Must always use=for setting value to a field. Use:=only for ref values created usingreffunction.
ReScript has record types which require a type definition before hand. You can access record fields by dot likefoo.myField.
It's also possible to define an inline object, it'll have quoted fields in this case.
Use records when working with structured data, and objects to conveniently pass payload data between functions.
Never use %raw to access object fields if you know the type.
Files:
- scenarios/test_codegen/test/lib_tests/SourceManager_test.res
- codegenerator/cli/templates/static/codegen/src/Env.res
- codegenerator/cli/templates/static/codegen/src/Config.res
- codegenerator/cli/npm/envio/src/Utils.res
- scenarios/test_codegen/test/rollback/Rollback_test.res
- codegenerator/cli/npm/envio/src/Batch.res
- scenarios/test_codegen/test/helpers/Mock.res
- codegenerator/cli/npm/envio/src/FetchState.res
- scenarios/test_codegen/test/ChainManager_test.res
- scenarios/test_codegen/test/lib_tests/FetchState_test.res
- codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
- codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res
**/*.res
📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)
Prefer reading ReScript .res modules directly; compiled .js artifacts can be ignored
Files:
- scenarios/test_codegen/test/lib_tests/SourceManager_test.res
- codegenerator/cli/templates/static/codegen/src/Env.res
- codegenerator/cli/templates/static/codegen/src/Config.res
- codegenerator/cli/npm/envio/src/Utils.res
- scenarios/test_codegen/test/rollback/Rollback_test.res
- codegenerator/cli/npm/envio/src/Batch.res
- scenarios/test_codegen/test/helpers/Mock.res
- codegenerator/cli/npm/envio/src/FetchState.res
- scenarios/test_codegen/test/ChainManager_test.res
- scenarios/test_codegen/test/lib_tests/FetchState_test.res
- codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
- codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res
codegenerator/cli/templates/{static,dynamic}/codegen/src/**/*
📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)
Edit template versions under codegenerator/cli/templates/static/codegen/src or codegenerator/cli/templates/dynamic/codegen/src instead of editing generated/src
Files:
- codegenerator/cli/templates/static/codegen/src/Env.res
- codegenerator/cli/templates/static/codegen/src/Config.res
- codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
- codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build_and_test
🔇 Additional comments (18)
scenarios/test_codegen/test/helpers/Mock.res (1)
357-362: itemMock optional handler/contractRegister: type change is sensible for flexibility.Switching to optional fields is aligned with the tests and avoids dummy no‑ops. No functional concerns.
scenarios/test_codegen/test/rollback/Rollback_test.res (1)
777-1027: Strong scenario coverage for dynamic contract rollback.The sequence assertions (pre/post rollback, registry state, and resumed fetching from 105) look solid and align with “include all block events to a batch.”
scenarios/test_codegen/test/lib_tests/SourceManager_test.res (1)
197-199: dcsToStore default to []: OK.Matches the refactor to non-optional dcsToStore and keeps updateInternal stable.
codegenerator/cli/templates/static/codegen/src/Env.res (1)
15-18: I’m still not seeing any direct references toEnv.batchSizein call sites. It appears thebatchSizevalue defined inEnv.resis only exposed via theEnvmodule but isn’t automatically wired into the application’s configuration—callers must explicitly pass it toConfig.makeor similar. Because we can’t confirm any downstream usage ofEnv.batchSize, please verify manually that wherever the application’s top-level config is constructed (e.g., incli/src/executor/dev.rs,cli/src/executor/codegen.rs, etc.),Env.batchSizeis indeed passed intoConfig.make(~batchSize=…)and that no hard-coded defaults (like the literal5000) remain. Ensure those call sites handle theNonecase by supplying a fallback.scenarios/test_codegen/test/ChainManager_test.res (3)
162-162: LGTM: updatedFetchStates usageGood switch to
{items, updatedFetchStates}. This aligns with the new batch API.
183-187: LGTM: threading updated fetch statesMapping
updatedFetchStatesback intochainFetcherslooks correct.
279-279: LGTM: dcsToStore default is now []Tests reflect the non‑optional list type.
codegenerator/cli/templates/static/codegen/src/Config.res (1)
117-118: LGTM: batchSize added to Config.tField addition is consistent with downstream usage.
scenarios/test_codegen/test/lib_tests/FetchState_test.res (1)
165-165: LGTM: migrate dcsToStore from option to array across testsAll expectations updated to
[]or concrete arrays match the new FetchState.t shape.Also applies to: 238-238, 304-304, 402-402, 708-708, 758-758, 878-878, 919-919, 942-942, 1226-1226, 1662-1662, 1702-1702, 1783-1783, 1870-1870
codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res (2)
902-905: LGTM: createBatch uses config.batchSize and updatedFetchStatesCorrectly passes
batchSizeTargetand extractsupdatedFetchStates.
937-941: LGTM: queue updates with updatedFetchStates + threshold gatingDispatching
UpdateQueues({updatedFetchStates, shouldEnterReorgThreshold})matches the new flow.codegenerator/cli/npm/envio/src/FetchState.res (3)
63-65: LGTM: dcsToStore is now a concrete arrayType and comment updated appropriately.
593-595: LGTM: concat semantics for dcsToStoreAppending newly registered DCs when preexisting ones exist is correct.
1244-1247: LGTM: rollback filters dcsToStore by removed addressesGood alignment with the new list representation.
codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (3)
49-55: Metrics wiring looks correctPrometheus.ProcessingMaxBatchSize now reflects config.batchSize; IndexingTargetBufferSize uses the computed target. Good.
183-183: Correct source for progressed stateUsing updatedFetchStates here is correct given earlier refactor.
242-247: Return shape matches the new public Batch.tReturn includes updatedFetchStates and dcsToStoreByChainId. LGTM.
codegenerator/cli/npm/envio/src/Batch.res (1)
12-14: Public field rename: updatedFetchStates — verifiedSearched the repo; no remaining destructuring or property-access of the old batch field "fetchStates" was found. Templates and tests use updatedFetchStates (e.g. codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res:904,937; scenarios/test_codegen/test/ChainManager_test.res:162). Local parameter names "fetchStates" in Batch.res are unchanged and expected.
        
          
                codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
          
            Show resolved
            Hide resolved
        
      There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
codegenerator/cli/npm/envio/src/Utils.res (1)
197-200: Prefer spliceInPlace over raw length mutationAvoiding %raw keeps the module consistent with existing array externals and reduces JS interop surface.
- let clearInPlace: array<'a> => unit = %raw(`(arr) => { - arr.length = 0 - }`) + let clearInPlace = (arr: array<'a>): unit => { + // Consistent with module's @send spliceInPlace external + arr->spliceInPlace(~pos=0, ~remove=arr->Array.length)->ignore + }codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res (1)
926-941: Avoid progress-only commits during rollback risk (FIXME)The FIXME is valid: advancing progress with empty
itemscan mask needed rollbacks after restart.Consider skipping DB progress writes for the “no items” branch when
rollbackStateisRollbackInMemStore, or gate behind a flag.- await Db.sql->InternalTable.Chains.setProgressedChains( - ~pgSchema=Db.publicSchema, - ~progressedChains, - ) + switch state.rollbackState { + | RollbackInMemStore(_) => () + | _ => + await Db.sql->InternalTable.Chains.setProgressedChains( + ~pgSchema=Db.publicSchema, + ~progressedChains, + ) + }I can open a follow-up task and add a test case if you want.
scenarios/test_codegen/test/lib_tests/FetchState_test.res (1)
2135-2135: Remove stray debug log
Js.log(updatedFetchState)adds noise to test output.- Js.log(updatedFetchState) + // No-opcodegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (1)
131-134: Use int-key accessor consistentlyPrefer
dangerouslyGetByIntNonOption(fetchState.chainId)over stringifying the key.- switch batchSizePerChain->Utils.Dict.dangerouslyGetNonOption(fetchState.chainId->Int.toString) { + switch batchSizePerChain->Utils.Dict.dangerouslyGetByIntNonOption(fetchState.chainId) {codegenerator/cli/npm/envio/src/Batch.res (1)
20-49: Ordered chain selection — tighten tiebreakerSelection prefers smaller timestamp; ties break by larger chainId due to the guard. Consider explicit stable tiebreak (e.g., prefer lower chainId) to avoid dependency on key iteration order.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
- codegenerator/cli/npm/envio/src/Batch.res(1 hunks)
- codegenerator/cli/npm/envio/src/FetchState.res(6 hunks)
- codegenerator/cli/npm/envio/src/Utils.res(2 hunks)
- codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res(4 hunks)
- codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res(2 hunks)
- scenarios/test_codegen/test/ChainManager_test.res(2 hunks)
- scenarios/test_codegen/test/lib_tests/FetchState_test.res(25 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{res,resi}
📄 CodeRabbit inference engine (.cursor/rules/rescript.mdc)
**/*.{res,resi}: Never use[| item |]to create an array. Use[ item ]instead.
Must always use=for setting value to a field. Use:=only for ref values created usingreffunction.
ReScript has record types which require a type definition before hand. You can access record fields by dot likefoo.myField.
It's also possible to define an inline object, it'll have quoted fields in this case.
Use records when working with structured data, and objects to conveniently pass payload data between functions.
Never use %raw to access object fields if you know the type.
Files:
- scenarios/test_codegen/test/ChainManager_test.res
- codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res
- codegenerator/cli/npm/envio/src/Batch.res
- codegenerator/cli/npm/envio/src/FetchState.res
- codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
- codegenerator/cli/npm/envio/src/Utils.res
- scenarios/test_codegen/test/lib_tests/FetchState_test.res
**/*.res
📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)
Prefer reading ReScript .res modules directly; compiled .js artifacts can be ignored
Files:
- scenarios/test_codegen/test/ChainManager_test.res
- codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res
- codegenerator/cli/npm/envio/src/Batch.res
- codegenerator/cli/npm/envio/src/FetchState.res
- codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
- codegenerator/cli/npm/envio/src/Utils.res
- scenarios/test_codegen/test/lib_tests/FetchState_test.res
codegenerator/cli/templates/{static,dynamic}/codegen/src/**/*
📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)
Edit template versions under codegenerator/cli/templates/static/codegen/src or codegenerator/cli/templates/dynamic/codegen/src instead of editing generated/src
Files:
- codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res
- codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build_and_test
🔇 Additional comments (19)
scenarios/test_codegen/test/ChainManager_test.res (2)
157-163: Tests aligned with new API (batchSizeTarget, updatedFetchStates) — LGTMCalls and pattern matches updated correctly to
~batchSizeTargetand{items, updatedFetchStates}.
181-189: Correct propagation of updated fetch statesMapping
updatedFetchStatesback intochainFetchersis accurate.codegenerator/cli/npm/envio/src/Utils.res (1)
73-79: Add integer-key accessor — LGTM
Dict.dangerouslyGetByIntNonOptioncomplementssetByInt/incrementByIntand keeps call-sites clean where int keys are used.Ensure all reads of int-keyed dicts use this accessor (e.g., places currently using
dangerouslyGetNonOption(Int.toString ...)) for consistency.codegenerator/cli/npm/envio/src/FetchState.res (7)
935-945: getTimestampAt: ordered-mode guard — LGTMThrowing for Block in ordered multichain prevents undefined timestamp ordering.
954-978: Ready-count preserves block atomicity — LGTM
getReadyItemsCountcontinues within the current block after hittingtargetSize. This avoids splitting blocks across batches.Please confirm there’s a unit test asserting no block is split when
targetSizecuts through a block.
980-982: Unsafe access is fine with prior guard; keep usage constrained
getUnsafeItemAtassumes a prior readiness check. Keep it internal to code paths that already validated count to avoid accidental misuse.
1120-1121: Initialize dcsToStore as [] — LGTMSimplifies logic vs option type and removes Some/None branching elsewhere.
1231-1234: Rollback dcsToStore filter — LGTMCorrectly prunes removed addresses from
dcsToStore.
1269-1283: Guard against batchSizeTarget=0 and avoid OOB unsafe_getThe current logic OOBs when
batchSizeTarget=0. Handle non-positive targets and switch to safeArray.get.- let hasFullBatch = ({queue} as fetchState: t, ~batchSizeTarget) => { - // Queue is ordered from latest to earliest, so the earliest eligible - // item for a full batch of size B is at index (length - B). - // Do NOT subtract an extra 1 here; when length === B we should still - // classify the queue as full and probe index 0. - let targetBlockIdx = queue->Array.length - batchSizeTarget - if targetBlockIdx < 0 { - false - } else { - // Unsafe can fail when batchSizeTarget is 0, - // but we ignore the case - queue->Js.Array2.unsafe_get(targetBlockIdx)->Internal.getItemBlockNumber <= - fetchState->bufferBlockNumber - } - } + let hasFullBatch = ({queue} as fetchState: t, ~batchSizeTarget) => { + if batchSizeTarget <= 0 { + false + } else { + let targetBlockIdx = queue->Array.length - batchSizeTarget + switch queue->Array.get(targetBlockIdx) { + | Some(item) => item->Internal.getItemBlockNumber <= fetchState->bufferBlockNumber + | None => false + } + } + }
1285-1306: Unordered prep uses readiness and stable ordering — LGTMFiltering via
hasReadyItemand sorting by earliest timestamp is appropriate for unordered mode.codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res (2)
901-905: Config-driven batch size — LGTMSwitch to
~batchSizeTarget=state.config.batchSizealigns with PR goals.
942-954: Benchmark hooks for batch creation — LGTMRecording creation time and size on empty/non-empty branches is helpful.
scenarios/test_codegen/test/lib_tests/FetchState_test.res (2)
8-29: Back-compat test shim — LGTM
oldQueueItem,getItem, andgetEarliestEventaccurately reflect new FetchState API while preserving prior test semantics.
2401-2404: Add 0/negative batchSizeTarget testsEdge-case coverage would catch the
hasFullBatchguard. Add a test with~batchSizeTarget=0to ensure no OOB and false returned.codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (2)
9-16: Verify buffer sizing semantics
config.batchSize * (activeChainsCount > 3 ? 1 : 3)reduces buffer when many chains are active. Confirm this is intended (1 batch total per chain vs 3 batches when <=3 chains).
131-166: Don’t mutate arrays passed into updateInternal (aliasing hazard)
leftDcsToStoreis passed intoupdateInternaland then mutated. ComputebatchDcs/leftDcsfirst, then callupdateInternalwith the finalizedleftDcs.- | dcs => { - let leftDcsToStore = [] - let batchDcs = [] - let updatedFetchState = - fetchState->FetchState.updateInternal(~mutItems=leftItems, ~dcsToStore=leftDcsToStore) - let nextProgressBlockNumber = updatedFetchState->FetchState.getProgressBlockNumber - let nextProgressNextBlockLogIndex = - updatedFetchState->FetchState.getProgressNextBlockLogIndex - - dcs->Array.forEach(dc => { + | dcs => { + // First advance state based on popped items + let tmp = fetchState->FetchState.updateInternal(~mutItems=leftItems) + let nextProgressBlockNumber = tmp->FetchState.getProgressBlockNumber + let nextProgressNextBlockLogIndex = tmp->FetchState.getProgressNextBlockLogIndex + + let batchDcs = [] + let leftDcs = [] + dcs->Array.forEach(dc => { // Important: This should be a registering block number. // This works for now since dc.startBlock is a registering block number. if ( dc.startBlock <= nextProgressBlockNumber || switch (nextProgressNextBlockLogIndex, dc.register) { | (Some(nextProgressNextBlockLogIndex), DC(dcData)) => dc.startBlock === nextProgressBlockNumber + 1 && dcData.registeringEventLogIndex <= nextProgressNextBlockLogIndex | _ => false } ) { - batchDcs->Array.push(dc) + batchDcs->Array.push(dc) } else { - // Mutate the array we passed to the updateInternal beforehand - leftDcsToStore->Array.push(dc) + leftDcs->Array.push(dc) } }) - dcsToStoreByChainId->Js.Dict.set(fetchState.chainId->Int.toString, batchDcs) - updatedFetchState + dcsToStoreByChainId->Js.Dict.set(fetchState.chainId->Int.toString, batchDcs) + // Now persist remaining DCs + tmp->FetchState.updateInternal(~dcsToStore=leftDcs) }codegenerator/cli/npm/envio/src/Batch.res (3)
78-121: Ordered batching respects block atomicity — LGTM
getReadyItemsCountwithfromItempreserves block boundaries while filling tobatchSizeTarget.
123-161: Unordered batching via prefiltered/sorted states — LGTMAccumulates per-chain items without splitting blocks; per-chain sizes recorded in
mutBatchSizePerChain.
51-58: Ready-item checks for modes — LGTM
hasMultichainReadyItemcorrectly gates batch creation based on multichain mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️  Outside diff range comments (1)
codegenerator/cli/npm/envio/src/FetchState.res (1)
1256-1285: Avoid nondeterministic sort: remove random comparator branch.Using
Js.Math.random_intinside sort comparator can yield unstable and pathological sorts. Prefer deterministic fallback.- // Use unsafe since we filtered out all queues without batch items - switch (a.buffer->Belt.Array.getUnsafe(0), b.buffer->Belt.Array.getUnsafe(0)) { - | (Event({timestamp: aTimestamp}), Event({timestamp: bTimestamp})) => - aTimestamp - bTimestamp - | (Block(_), _) - | (_, Block(_)) => - // Currently block items don't have a timestamp, - // so we sort chains with them in a random order - Js.Math.random_int(-1, 1) - } + // Deterministic ordering: prefer earliest blockNumber; tie-break by timestamp when available; then chainId + let a0 = a.buffer->Belt.Array.getUnsafe(0) + let b0 = b.buffer->Belt.Array.getUnsafe(0) + let bnDiff = + a0->Internal.getItemBlockNumber - b0->Internal.getItemBlockNumber + if bnDiff !== 0 { + bnDiff + } else { + switch (a0, b0) { + | (Event({timestamp: aTs}), Event({timestamp: bTs})) => + let tsDiff = aTs - bTs + if tsDiff !== 0 { tsDiff } else { a.chainId - b.chainId } + | _ => a.chainId - b.chainId + } + }
🧹 Nitpick comments (5)
scenarios/test_codegen/test/lib_tests/FetchState_onBlock_test.res (2)
92-99: Fix outdated "reverse order" comments (buffer is earliest → latest).The buffer stores items earliest-to-latest, but comments still say "reverse order (latest to earliest)". Update these to avoid confusion.
- // Expected in reverse order (latest to earliest): block items have logIndex=16777216, event has logIndex=0 + // Buffer order is earliest → latest: block items have logIndex=16777216, event has logIndex=0Also applies to: 146-147, 194-195, 248-249, 300-301
93-97: Nit: rename localqueuevar tobufferfor consistency.You're reading from
updatedFetchState.bufferbut bind it toqueue. Consider renaming to reduce cognitive load in future diffs.Also applies to: 140-142, 188-191, 240-243, 295-298
scenarios/test_codegen/test/lib_tests/FetchState_test.res (2)
1946-1953: Correct the assertion message: ordering is ASC, not DESC.Expected buffer is sorted by (blockNumber ASC, logIndex ASC). Update the message.
- ~message="Queue must be sorted DESC by (blockNumber, logIndex) regardless of input order", + ~message="Buffer must be sorted ASC by (blockNumber, logIndex) regardless of input order",
2135-2135: Remove leftover debug log.
Js.log(updatedFetchState)is noisy in tests.- Js.log(updatedFetchState) + /* removed debug log */codegenerator/cli/npm/envio/src/FetchState.res (1)
948-969: GuardgetReadyItemsCountfor non‑positivetargetSize.Without this,
targetSize=0can still consume items. Add an early return.let getReadyItemsCount = (fetchState: t, ~targetSize: int, ~fromItem) => { + if targetSize <= 0 { + 0 + } else { let readyBlockNumber = ref(fetchState->bufferBlockNumber) let acc = ref(0) let isFinished = ref(false) while !isFinished.contents { switch fetchState.buffer->Belt.Array.get(fromItem + acc.contents) { | Some(item) => let itemBlockNumber = item->Internal.getItemBlockNumber if itemBlockNumber <= readyBlockNumber.contents { acc := acc.contents + 1 if acc.contents === targetSize { // Should finish accumulating items from the same block readyBlockNumber := itemBlockNumber } } else { isFinished := true } | None => isFinished := true } } acc.contents + } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
- codegenerator/cli/npm/envio/src/Batch.res(1 hunks)
- codegenerator/cli/npm/envio/src/FetchState.res(16 hunks)
- codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res(4 hunks)
- scenarios/test_codegen/test/lib_tests/FetchState_onBlock_test.res(6 hunks)
- scenarios/test_codegen/test/lib_tests/FetchState_test.res(34 hunks)
- scenarios/test_codegen/test/lib_tests/SourceManager_test.res(3 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{res,resi}
📄 CodeRabbit inference engine (.cursor/rules/rescript.mdc)
**/*.{res,resi}: Never use[| item |]to create an array. Use[ item ]instead.
Must always use=for setting value to a field. Use:=only for ref values created usingreffunction.
ReScript has record types which require a type definition before hand. You can access record fields by dot likefoo.myField.
It's also possible to define an inline object, it'll have quoted fields in this case.
Use records when working with structured data, and objects to conveniently pass payload data between functions.
Never use %raw to access object fields if you know the type.
Files:
- codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
- scenarios/test_codegen/test/lib_tests/FetchState_onBlock_test.res
- scenarios/test_codegen/test/lib_tests/FetchState_test.res
- scenarios/test_codegen/test/lib_tests/SourceManager_test.res
- codegenerator/cli/npm/envio/src/FetchState.res
- codegenerator/cli/npm/envio/src/Batch.res
**/*.res
📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)
Prefer reading ReScript .res modules directly; compiled .js artifacts can be ignored
Files:
- codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
- scenarios/test_codegen/test/lib_tests/FetchState_onBlock_test.res
- scenarios/test_codegen/test/lib_tests/FetchState_test.res
- scenarios/test_codegen/test/lib_tests/SourceManager_test.res
- codegenerator/cli/npm/envio/src/FetchState.res
- codegenerator/cli/npm/envio/src/Batch.res
codegenerator/cli/templates/{static,dynamic}/codegen/src/**/*
📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)
Edit template versions under codegenerator/cli/templates/static/codegen/src or codegenerator/cli/templates/dynamic/codegen/src instead of editing generated/src
Files:
- codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build_and_test
🔇 Additional comments (15)
scenarios/test_codegen/test/lib_tests/SourceManager_test.res (2)
152-154: Initialize buffer and dcsToStore as arrays (LGTM).Switch to
~buffer=[]anddcsToStore: []aligns with the new API.Also applies to: 189-198
691-699: Pre-seeding buffer for targetBufferSize tests (LGTM).Using buffer seeds to simulate cap behavior matches the new buffering semantics.
scenarios/test_codegen/test/lib_tests/FetchState_test.res (4)
8-28: Backward-compatibility shim (LGTM).
oldQueueItem,getItem, andgetEarliestEventcorrectly adapt tests to buffer semantics and ready-count API.
175-181: AdoptingbufferanddcsToStore: []in state assertions (LGTM).Assertions reflect the new concrete fields and ordering.
Also applies to: 246-254, 319-321, 410-418, 888-894
2042-2046: LGTM: earliest-event helper usage matches new buffer semantics.The checks via
getEarliestEvent->getItemare consistent and safe.Also applies to: 2074-2076, 2110-2116, 2178-2180
2401-2405: LGTM:filterAndSortForUnorderedBatchtests target newbatchSizeTarget.Coverage is good for half/full batches and earliest timestamp prioritization.
Also applies to: 2443-2447, 2485-2489
codegenerator/cli/npm/envio/src/FetchState.res (2)
188-194: Comparator matches new buffer order (LGTM).ASC by blockNumber, then logIndex, aligns with earliest→latest buffer semantics.
1118-1128: Rollback buffer pruning and dcs filtering (LGTM).Pruning by (blockNumber, logIndex) and filtering
dcsToStoreagainst removed addresses are correct.Also applies to: 1216-1222
codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (4)
9-16: Config‑aware targetBufferSize (LGTM).Deriving
targetBufferSizefromconfig.batchSizeand chains count looks sound; metrics wiring is updated accordingly.Also applies to: 49-55
98-106: Expose fetchStates (LGTM).
getFetchStatesaccessor simplifies downstream batching logic.
108-122: Batching API updates (LGTM).
prepareOrdered/UnorderedBatch(~batchSizeTarget, ...)usage is consistent.- Returning
updatedFetchStatesandcreationTimeMsaligns with the new contract.Also applies to: 123-124, 130-171, 205-212
137-167: Don’t mutate arrays after passing them into updateInternal.
leftDcsToStoreis passed toupdateInternaland then mutated, which is brittle aliasing. Partition first, then pass the “left” slice.- | dcs => { - let leftDcsToStore = [] - let batchDcs = [] - let updatedFetchState = - fetchState->FetchState.updateInternal(~mutItems=leftItems, ~dcsToStore=leftDcsToStore) - let nextProgressBlockNumber = updatedFetchState->FetchState.getProgressBlockNumber - let nextProgressNextBlockLogIndex = - updatedFetchState->FetchState.getProgressNextBlockLogIndex - - dcs->Array.forEach(dc => { - // Important: This should be a registering block number. - // This works for now since dc.startBlock is a registering block number. - if ( - dc.startBlock <= nextProgressBlockNumber || - switch (nextProgressNextBlockLogIndex, dc.register) { - | (Some(nextProgressNextBlockLogIndex), DC(dcData)) => - dc.startBlock === nextProgressBlockNumber + 1 && - dcData.registeringEventLogIndex <= nextProgressNextBlockLogIndex - | _ => false - } - ) { - batchDcs->Array.push(dc) - } else { - // Mutate the array we passed to the updateInternal beforehand - leftDcsToStore->Array.push(dc) - } - }) - - dcsToStoreByChainId->Js.Dict.set(fetchState.chainId->Int.toString, batchDcs) - updatedFetchState - } + | dcs => { + // First, advance internal state based on popped items. + let tmp = fetchState->FetchState.updateInternal(~mutItems=leftItems) + let nextProgressBlockNumber = tmp->FetchState.getProgressBlockNumber + let nextProgressNextBlockLogIndex = tmp->FetchState.getProgressNextBlockLogIndex + + let batchDcs = [] + let leftDcs = [] + dcs->Array.forEach(dc => { + let shouldStoreNow = + dc.startBlock <= nextProgressBlockNumber || + switch (nextProgressNextBlockLogIndex, dc.register) { + | (Some(nextIdx), DC(dcData)) => + dc.startBlock === nextProgressBlockNumber + 1 && + dcData.registeringEventLogIndex <= nextIdx + | _ => false + } + if shouldStoreNow { batchDcs->Array.push(dc) } else { leftDcs->Array.push(dc) } + }) + + dcsToStoreByChainId->Js.Dict.set(fetchState.chainId->Int.toString, batchDcs) + // Now persist the remaining DCs in the fetch state. + tmp->FetchState.updateInternal(~dcsToStore=leftDcs) + }codegenerator/cli/npm/envio/src/Batch.res (3)
20-49: Ordered chain selection by timestamp index (LGTM, with tie-break).
getOrderedNextChainrespects per-chain offsets and provides deterministic chainId tie-break.
78-121: Block‑atomic accumulation in ordered mode (LGTM).Using
getReadyItemsCount(~fromItem=itemsCountBefore)ensures full-block inclusion when crossing targets.
123-161: Unordered preparation honors full‑batch preference and block atomicity (LGTM).Integration with
FetchState.filterAndSortForUnorderedBatch(~batchSizeTarget)andgetReadyItemsCountis correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (6)
scenarios/test_codegen/test/lib_tests/FetchState_test.res (1)
1945-1952: Fix assertion message: ordering is ASC, and it’s a bufferExpected order is earliest→latest. Update the message for accuracy.
- ~message="Queue must be sorted DESC by (blockNumber, logIndex) regardless of input order", + ~message="Buffer must be sorted ASC by (blockNumber, logIndex) regardless of input order",codegenerator/cli/npm/envio/src/FetchState.res (2)
948-969: Guard getReadyItemsCount for non‑positive targetSizeAvoid surprising behavior when targetSize <= 0.
-let getReadyItemsCount = (fetchState: t, ~targetSize: int, ~fromItem) => { +let getReadyItemsCount = (fetchState: t, ~targetSize: int, ~fromItem) => { + if targetSize <= 0 { + 0 + } else { let readyBlockNumber = ref(fetchState->bufferBlockNumber) let acc = ref(0) let isFinished = ref(false) while !isFinished.contents { switch fetchState.buffer->Belt.Array.get(fromItem + acc.contents) { | Some(item) => let itemBlockNumber = item->Internal.getItemBlockNumber if itemBlockNumber <= readyBlockNumber.contents { acc := acc.contents + 1 if acc.contents === targetSize { // Should finish accumulating items from the same block readyBlockNumber := itemBlockNumber } } else { isFinished := true } | None => isFinished := true } } acc.contents + } }
1256-1261: Edge‑case: treat batchSizeTarget <= 0 as never fullCurrent code is safe via Belt.Array.get, but making intent explicit improves readability.
- let hasFullBatch = ({buffer} as fetchState: t, ~batchSizeTarget) => { - switch buffer->Belt.Array.get(batchSizeTarget - 1) { + let hasFullBatch = ({buffer} as fetchState: t, ~batchSizeTarget) => { + if batchSizeTarget <= 0 { + false + } else switch buffer->Belt.Array.get(batchSizeTarget - 1) { | Some(item) => item->Internal.getItemBlockNumber <= fetchState->bufferBlockNumber | None => false } }codegenerator/cli/npm/envio/src/Batch.res (2)
16-18: Fix typo in docstring“returnes” → “returns”.
-/** - It either returnes an earliest item among all chains, or None if no chains are actively indexing - */ +/** + Returns the earliest chain among all chains, or None if no chains are actively indexing. + */
19-48: Clarify tie-break logic and avoid variable shadowing for readabilityRename the outer ref and inner matched value to avoid shadowing and make the tie-break clearer.
-let getOrderedNextChain = (fetchStates: ChainMap.t<FetchState.t>, ~batchSizePerChain) => { - let earliestChain: ref<option<FetchState.t>> = ref(None) +let getOrderedNextChain = (fetchStates: ChainMap.t<FetchState.t>, ~batchSizePerChain) => { + let earliestChainRef: ref<option<FetchState.t>> = ref(None) let earliestChainTimestamp = ref(0) let chainKeys = fetchStates->ChainMap.keys for idx in 0 to chainKeys->Array.length - 1 { let chain = chainKeys->Array.get(idx) let fetchState = fetchStates->ChainMap.get(chain) if fetchState->FetchState.isActivelyIndexing { let timestamp = fetchState->FetchState.getTimestampAt( ~index=switch batchSizePerChain->Utils.Dict.dangerouslyGetByIntNonOption( chain->ChainMap.Chain.toChainId, ) { | Some(batchSize) => batchSize | None => 0 }, ) - switch earliestChain.contents { - | Some(earliestChain) + switch earliestChainRef.contents { + | Some(currEarliest) if timestamp > earliestChainTimestamp.contents || (timestamp === earliestChainTimestamp.contents && - chain->ChainMap.Chain.toChainId > earliestChain.chainId) => () + chain->ChainMap.Chain.toChainId > currEarliest.chainId) => () | _ => { - earliestChain := Some(fetchState) + earliestChainRef := Some(fetchState) earliestChainTimestamp := timestamp } } } - } - earliestChain.contents + } + earliestChainRef.contents }codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (1)
9-16: Double-check buffer sizing formulaUsing 1 when activeChainsCount > 3 likely under-provisions the buffer as chains scale. If the intent is “at least 3 batches or activeChainsCount batches,” consider:
- | None => - config.batchSize * (activeChainsCount > targetBatchesInBuffer ? 1 : targetBatchesInBuffer) + | None => + config.batchSize + * (activeChainsCount > targetBatchesInBuffer ? activeChainsCount : targetBatchesInBuffer)If the existing behavior is intentional, add a brief comment explaining the rationale. Otherwise, apply the change above.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
- codegenerator/cli/npm/envio/src/Batch.res(1 hunks)
- codegenerator/cli/npm/envio/src/FetchState.res(16 hunks)
- codegenerator/cli/npm/envio/src/Persistence.res(1 hunks)
- codegenerator/cli/npm/envio/src/db/InternalTable.res(1 hunks)
- codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res(3 hunks)
- codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res(4 hunks)
- scenarios/test_codegen/test/lib_tests/FetchState_test.res(34 hunks)
- scenarios/test_codegen/test/lib_tests/PgStorage_test.res(8 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- codegenerator/cli/npm/envio/src/db/InternalTable.res
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{res,resi}
📄 CodeRabbit inference engine (.cursor/rules/rescript.mdc)
**/*.{res,resi}: Never use[| item |]to create an array. Use[ item ]instead.
Must always use=for setting value to a field. Use:=only for ref values created usingreffunction.
ReScript has record types which require a type definition before hand. You can access record fields by dot likefoo.myField.
It's also possible to define an inline object, it'll have quoted fields in this case.
Use records when working with structured data, and objects to conveniently pass payload data between functions.
Never use %raw to access object fields if you know the type.
Files:
- codegenerator/cli/npm/envio/src/Persistence.res
- codegenerator/cli/npm/envio/src/FetchState.res
- codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res
- codegenerator/cli/npm/envio/src/Batch.res
- codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
- scenarios/test_codegen/test/lib_tests/FetchState_test.res
- scenarios/test_codegen/test/lib_tests/PgStorage_test.res
**/*.res
📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)
Prefer reading ReScript .res modules directly; compiled .js artifacts can be ignored
Files:
- codegenerator/cli/npm/envio/src/Persistence.res
- codegenerator/cli/npm/envio/src/FetchState.res
- codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res
- codegenerator/cli/npm/envio/src/Batch.res
- codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
- scenarios/test_codegen/test/lib_tests/FetchState_test.res
- scenarios/test_codegen/test/lib_tests/PgStorage_test.res
codegenerator/cli/templates/{static,dynamic}/codegen/src/**/*
📄 CodeRabbit inference engine (.cursor/rules/navigation.mdc)
Edit template versions under codegenerator/cli/templates/static/codegen/src or codegenerator/cli/templates/dynamic/codegen/src instead of editing generated/src
Files:
- codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res
- codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build_and_test
🔇 Additional comments (21)
codegenerator/cli/npm/envio/src/Persistence.res (1)
145-147: Resume checkpoints: progressBlockNumber is the right pivot — LGTMSwitching checkpoints to use c.progressBlockNumber per chain matches the schema/API changes and simplifies resume.
scenarios/test_codegen/test/lib_tests/PgStorage_test.res (3)
147-147: envio_chains DDL aligns with progress-based trackingSchema includes progress_block and _num_batches_fetched with sensible NOT NULL constraints. Looks consistent across schemas.
Also applies to: 232-232, 314-314
187-189: Initial INSERTs match the new column set and orderingColumn lists explicitly include source_block/buffer_block/progress_block; sentinel -1 for buffer/progress aligns with restart logic.
Please confirm we consistently treat progress_block = -1 as “unset” (e.g., ChainFetcher.makeFromDbState).
Also applies to: 581-583, 608-610, 644-647
533-536: Progress fields UPDATE placeholders are correctOrdering: id=$1, progress_block=$2, events_processed=$3 is coherent and unambiguous.
codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res (2)
271-276: Restart fallback logic is correct and explicitUses resumedChainState.progressBlockNumber when >= 0, else (startBlock - 1). Passed through to make as progressBlockNumber.
Also applies to: 285-286
207-208: processingFilters default to None across constructorsConstructor sets processingFilters: None; addProcessingFilter still enables runtime filters when needed.
scenarios/test_codegen/test/lib_tests/FetchState_test.res (2)
8-16: Back‑compat shim (oldQueueItem + getEarliestEvent) is tidyKeeps legacy Some/None behavior while exercising new buffer semantics. Good coverage.
Also applies to: 19-28
2403-2408: Tests updated to batchSizeTarget — LGTMRenames and expectations around filterAndSortForUnorderedBatch reflect the new API.
Also applies to: 2445-2450, 2485-2492
codegenerator/cli/npm/envio/src/FetchState.res (2)
64-65: dcsToStore now a plain array — consistent and simplerRemoval of option reduces branching; updated writes/filters look correct.
Also applies to: 571-574, 1115-1116, 1217-1221
79-80: Buffer semantics and ordering are coherent
- Buffer is earliest→latest; comparator sorts by (blockNumber, logIndex) ascending.
- Metrics updated to use buffer size/block. All consistent.
Also applies to: 188-194, 310-315, 323-329
codegenerator/cli/npm/envio/src/Batch.res (6)
11-14: Type surface changes look goodupdatedFetchStates and creationTimeMs integration is clear and consistent with the PR goals.
50-57: LGTM on readiness helperEfficient reuse via an immutable empty dict and correct gating on hasReadyItem.
59-65: LGTMCorrect readiness check for unordered mode.
67-75: Mode-gated readiness is clearSwitching on multichain mode is straightforward.
122-160: Verified — unordered path won't slice a block mid-waygetReadyItemsCount (FetchState.res) updates readyBlockNumber to the block of the target-th item and continues consuming that block; filterAndSortForUnorderedBatch prioritizes full batches. prepareUnorderedBatch calls getReadyItemsCount per chain, so blocks are not split across batches.
77-118: Confirm getReadyItemsCount preserves block atomicity and may exceed the remaining targetFetchState.res:getReadyItemsCount sets readyBlockNumber to the block of the item that first reaches targetSize and then continues counting only items with that blockNumber, so it will include the rest of that block (possibly exceeding targetSize). It uses blockNumber (not logIndex) and relies on buffer ordering to preserve per-block logIndex order.
codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res (5)
49-54: Prometheus max batch size source updatedSwitching to config.batchSize matches the new config-driven sizing.
98-102: Fetch state access simplifiedDirectly exposing cf.fetchState is clear and avoids deep-copy overhead.
108-124: Batch creation flow aligns with new APIsMode selection and per-chain counters are wired correctly to Batch.prepare*.
130-161: Avoid mutating dcsToStore after passing into updateInternalleftDcsToStore is passed into updateInternal and then mutated (Line 149). This aliasing is brittle and couples to updateInternal’s internal semantics.
Apply this safer partition-first approach:
- | dcs => { - let leftDcsToStore = [] - let batchDcs = [] - let updatedFetchState = - fetchState->FetchState.updateInternal(~mutItems=leftItems, ~dcsToStore=leftDcsToStore) - let nextProgressBlockNumber = updatedFetchState->FetchState.getProgressBlockNumber - - dcs->Array.forEach(dc => { - // Important: This should be a registering block number. - // This works for now since dc.startBlock is a registering block number. - if dc.startBlock <= nextProgressBlockNumber { - batchDcs->Array.push(dc) - } else { - // Mutate the array we passed to the updateInternal beforehand - leftDcsToStore->Array.push(dc) - } - }) - - dcsToStoreByChainId->Js.Dict.set(fetchState.chainId->Int.toString, batchDcs) - updatedFetchState - } + | dcs => { + // First, advance internal state based on popped items. + let tmp = fetchState->FetchState.updateInternal(~mutItems=leftItems) + let nextProgressBlockNumber = tmp->FetchState.getProgressBlockNumber + + let batchDcs = [] + let leftDcs = [] + dcs->Array.forEach(dc => { + // dc.startBlock is a registering block number. + if dc.startBlock <= nextProgressBlockNumber { + batchDcs->Array.push(dc) + } else { + leftDcs->Array.push(dc) + } + }) + + dcsToStoreByChainId->Js.Dict.set(fetchState.chainId->Int.toString, batchDcs) + // Now persist the remaining DCs in the fetch state. + tmp->FetchState.updateInternal(~dcsToStore=leftDcs) + }
167-201: Progress/accounting assembly looks consistentprogressedChains and creationTimeMs fields are computed coherently with per-chain sizes.
Optional: add a debug counter or assertion in non-prod builds to verify
sum(batchSizePerChain) === items.length for early detection of consistency issues.
| NoItem({ | ||
| latestFetchedBlock: fetchState->bufferBlock, | ||
| }) | ||
| let getReadyItemsCount = (fetchState: t, ~targetSize: int, ~fromItem) => { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this only used in the test? Maybe should be a comment then or added to the naming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nvmnd I see where its used
| ~persistence=codegenPersistence, | ||
| ~ecosystem=InternalConfig.Evm, | ||
| ~registrations=?, | ||
| ~batchSize=5000, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think this should default to Env.maxBatchSize or whatever the env var is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I forgot to pass an env variable to the config 😅
| // FIXME: When state.rollbackState is RollbackInMemStore | ||
| // If we increase progress in this case (no items) | ||
| // and then indexer restarts - there's a high chance of missing | ||
| // the rollback. This should be tested and fixed. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this introduced or was this the previous behaviour?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, do you think we have sufficient test coverage on the batch creation and fetch state updates for this change? Its quite significant otherwise and difficult to be sure just from code review
| I'd say the test coverage is quite decent. | 
| I removed  | 
Update batch creation logic to include full block events in a batch, even if they exceed the batch size.
_progress_log_indexcolumn fromenvio_chainssince it's not neededprocessingFiltersfor rollbackSummary by CodeRabbit