Skip to content

Commit c4b414b

Browse files
committed
Remove eventOrigin, introduce context chains
1 parent 63fa750 commit c4b414b

File tree

14 files changed

+198
-121
lines changed

14 files changed

+198
-121
lines changed

codegenerator/cli/npm/envio/src/Internal.gen.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ export type entityHandlerContext<entity> = {
4242
readonly deleteUnsafe: (_1:string) => void
4343
};
4444

45-
export type eventOrigin = "Historical" | "Live";
45+
export type chainInfo = { readonly isReady: boolean };
4646

4747
export type genericHandlerWithLoader<loader,handler,eventFilters> = {
4848
readonly loader: loader;

codegenerator/cli/npm/envio/src/Internal.res

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,16 @@ type entityHandlerContext<'entity> = {
5555
}
5656

5757
@genType
58-
type eventOrigin = Historical | Live
58+
type chainInfo = {
59+
isReady: bool,
60+
}
61+
62+
type chains = dict<chainInfo>
5963

6064
type loaderReturn
6165
type handlerContext = private {
6266
isPreload: bool,
63-
eventOrigin: eventOrigin,
67+
chains: chains,
6468
}
6569
type handlerArgs = {
6670
event: event,

codegenerator/cli/templates/dynamic/codegen/src/TestHelpers_MockDb.res.hbs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -405,20 +405,24 @@ and makeProcessEvents = (mockDb: t, ~chainId=?) => async (
405405
}
406406
}
407407

408+
// Create a mock chains state where the processing chain is ready (simulating "Live" mode)
409+
let chains = Js.Dict.empty()
410+
chains->Js.Dict.set(processingChainId->Int.toString, {Internal.isReady: true})
411+
408412
try {
409413
await items->EventProcessing.preloadBatchOrThrow(
410414
~loadManager,
411415
~persistence,
412416
~inMemoryStore,
413-
~eventOrigin=Internal.Live,
417+
~chains,
414418
)
415419
await items->EventProcessing.runBatchHandlersOrThrow(
416420
~inMemoryStore,
417421
~loadManager,
418422
~config,
419423
~shouldSaveHistory=false,
420424
~shouldBenchmark=false,
421-
~eventOrigin=Internal.Live,
425+
~chains,
422426
)
423427
} catch {
424428
| EventProcessing.ProcessingError({message, exn, item}) =>

codegenerator/cli/templates/dynamic/codegen/src/Types.res.hbs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type handlerContext = {
2929
log: Envio.logger,
3030
effect: 'input 'output. (Envio.effect<'input, 'output>, 'input) => promise<'output>,
3131
isPreload: bool,
32-
eventOrigin: Internal.eventOrigin,
32+
chains: Internal.chains,
3333
{{#each entities as | entity |}}
3434
@as("{{entity.name.original}}") {{entity.name.uncapitalized}}: entityHandlerContext<Entities.{{entity.name.capitalized}}.t, Entities.{{entity.name.capitalized}}.indexedFieldOperations>,
3535
{{/each}}
@@ -50,7 +50,7 @@ type loaderContext = {
5050
log: Envio.logger,
5151
effect: 'input 'output. (Envio.effect<'input, 'output>, 'input) => promise<'output>,
5252
isPreload: bool,
53-
eventOrigin: Internal.eventOrigin,
53+
chains: Internal.chains,
5454
{{#each entities as | entity |}}
5555
@as("{{entity.name.original}}") {{entity.name.uncapitalized}}: entityLoaderContext<Entities.{{entity.name.capitalized}}.t, Entities.{{entity.name.capitalized}}.indexedFieldOperations>,
5656
{{/each}}
@@ -63,7 +63,7 @@ type entityHandlerContext<'entity> = Internal.entityHandlerContext<'entity>
6363
type handlerContext = {
6464
log: Envio.logger,
6565
effect: 'input 'output. (Envio.effect<'input, 'output>, 'input) => promise<'output>,
66-
eventOrigin: Internal.eventOrigin,
66+
chains: Internal.chains,
6767
{{#each entities as | entity |}}
6868
@as("{{entity.name.original}}") {{entity.name.uncapitalized}}: entityHandlerContext<Entities.{{entity.name.capitalized}}.t>,
6969
{{/each}}

codegenerator/cli/templates/static/codegen/src/EventProcessing.res

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,20 @@
11
open Belt
22

3-
let allChainsEventsProcessedToEndblock = (chainFetchers: ChainMap.t<ChainFetcher.t>) => {
3+
let computeChainsState = (chainFetchers: ChainMap.t<ChainFetcher.t>): Internal.chains => {
4+
let chains = Js.Dict.empty()
5+
46
chainFetchers
5-
->ChainMap.values
6-
->Array.every(cf => cf->ChainFetcher.hasProcessedToEndblock)
7+
->ChainMap.entries
8+
->Array.forEach(((chain, chainFetcher)) => {
9+
let chainId = chain->ChainMap.Chain.toChainId->Int.toString
10+
let isReady = chainFetcher->ChainFetcher.hasProcessedToEndblock
11+
12+
chains->Js.Dict.set(chainId, {
13+
Internal.isReady: isReady,
14+
})
15+
})
16+
17+
chains
718
}
819

920
let convertFieldsToJson = (fields: option<dict<unknown>>) => {
@@ -87,7 +98,7 @@ let runEventHandlerOrThrow = async (
8798
~persistence,
8899
~shouldSaveHistory,
89100
~shouldBenchmark,
90-
~eventOrigin: Internal.eventOrigin,
101+
~chains: Internal.chains,
91102
) => {
92103
let eventItem = item->Internal.castUnsafeEventItem
93104

@@ -106,7 +117,7 @@ let runEventHandlerOrThrow = async (
106117
persistence,
107118
shouldSaveHistory,
108119
isPreload: false,
109-
eventOrigin,
120+
chains,
110121
}),
111122
}: Internal.handlerArgs
112123
),
@@ -139,7 +150,7 @@ let runHandlerOrThrow = async (
139150
~config: Config.t,
140151
~shouldSaveHistory,
141152
~shouldBenchmark,
142-
~eventOrigin: Internal.eventOrigin,
153+
~chains: Internal.chains,
143154
) => {
144155
switch item {
145156
| Block({onBlockConfig: {handler, chainId}, blockNumber}) =>
@@ -158,7 +169,7 @@ let runHandlerOrThrow = async (
158169
persistence: config.persistence,
159170
shouldSaveHistory,
160171
isPreload: false,
161-
eventOrigin,
172+
chains,
162173
}),
163174
}: Internal.onBlockArgs
164175
),
@@ -183,7 +194,7 @@ let runHandlerOrThrow = async (
183194
~persistence=config.persistence,
184195
~shouldSaveHistory,
185196
~shouldBenchmark,
186-
~eventOrigin,
197+
~chains,
187198
)
188199
| None => ()
189200
}
@@ -200,7 +211,7 @@ let preloadBatchOrThrow = async (
200211
~loadManager,
201212
~persistence,
202213
~inMemoryStore,
203-
~eventOrigin: Internal.eventOrigin,
214+
~chains: Internal.chains,
204215
) => {
205216
// On the first run of loaders, we don't care about the result,
206217
// whether it's an error or a return type.
@@ -224,7 +235,7 @@ let preloadBatchOrThrow = async (
224235
persistence,
225236
isPreload: true,
226237
shouldSaveHistory: false,
227-
eventOrigin,
238+
chains,
228239
}),
229240
})->Promise.silentCatch,
230241
// Must have Promise.catch as well as normal catch,
@@ -250,7 +261,7 @@ let preloadBatchOrThrow = async (
250261
persistence,
251262
isPreload: true,
252263
shouldSaveHistory: false,
253-
eventOrigin,
264+
chains,
254265
}),
255266
})->Promise.silentCatch,
256267
)
@@ -269,7 +280,7 @@ let runBatchHandlersOrThrow = async (
269280
~config,
270281
~shouldSaveHistory,
271282
~shouldBenchmark,
272-
~eventOrigin: Internal.eventOrigin,
283+
~chains: Internal.chains,
273284
) => {
274285
for i in 0 to eventBatch->Array.length - 1 {
275286
let item = eventBatch->Js.Array2.unsafe_get(i)
@@ -280,7 +291,7 @@ let runBatchHandlersOrThrow = async (
280291
~config,
281292
~shouldSaveHistory,
282293
~shouldBenchmark,
283-
~eventOrigin,
294+
~chains,
284295
)
285296
}
286297
}
@@ -319,12 +330,8 @@ let processEventBatch = async (
319330
~config: Config.t,
320331
~chainFetchers: ChainMap.t<ChainFetcher.t>,
321332
) => {
322-
// Determine event origin based on whether all chains have caught up
323-
let eventOrigin: Internal.eventOrigin = if chainFetchers->allChainsEventsProcessedToEndblock {
324-
Live
325-
} else {
326-
Historical
327-
}
333+
// Compute chains state for this batch
334+
let chains: Internal.chains = chainFetchers->computeChainsState
328335
let batchSize = items->Array.length
329336
let byChain = Js.Dict.empty()
330337
progressedChains->Js.Array2.forEach(data => {
@@ -350,7 +357,7 @@ let processEventBatch = async (
350357
try {
351358
let timeRef = Hrtime.makeTimer()
352359

353-
await items->preloadBatchOrThrow(~loadManager, ~persistence=config.persistence, ~inMemoryStore, ~eventOrigin)
360+
await items->preloadBatchOrThrow(~loadManager, ~persistence=config.persistence, ~inMemoryStore, ~chains)
354361

355362
let elapsedTimeAfterLoaders = timeRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis
356363

@@ -360,7 +367,7 @@ let processEventBatch = async (
360367
~config,
361368
~shouldSaveHistory=config->Config.shouldSaveHistory(~isInReorgThreshold),
362369
~shouldBenchmark=Env.Benchmark.shouldSaveData,
363-
~eventOrigin,
370+
~chains,
364371
)
365372

366373
let elapsedTimeAfterProcessing =

codegenerator/cli/templates/static/codegen/src/UserContext.res

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ type contextParams = {
2424
persistence: Persistence.t,
2525
isPreload: bool,
2626
shouldSaveHistory: bool,
27-
eventOrigin: Internal.eventOrigin,
27+
chains: Internal.chains,
2828
}
2929

3030
let rec initEffect = (params: contextParams) => (
@@ -237,7 +237,7 @@ let handlerTraps: Utils.Proxy.traps<contextParams> = {
237237
)
238238

239239
| "isPreload" => params.isPreload->Utils.magic
240-
| "eventOrigin" => params.eventOrigin->Utils.magic
240+
| "chains" => params.chains->Utils.magic
241241
| _ =>
242242
switch Entities.byName->Utils.Dict.dangerouslyGetNonOption(prop) {
243243
| Some(entityConfig) =>
@@ -248,7 +248,7 @@ let handlerTraps: Utils.Proxy.traps<contextParams> = {
248248
loadManager: params.loadManager,
249249
persistence: params.persistence,
250250
shouldSaveHistory: params.shouldSaveHistory,
251-
eventOrigin: params.eventOrigin,
251+
chains: params.chains,
252252
entityConfig,
253253
}
254254
->Utils.Proxy.make(entityTraps)

codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -629,9 +629,11 @@ let actionReducer = (state: t, action: action) => {
629629
processedBatches: state.processedBatches + 1,
630630
}
631631

632-
let shouldExit = EventProcessing.allChainsEventsProcessedToEndblock(
633-
state.chainManager.chainFetchers,
634-
)
632+
// Check if all chains have reached their end blocks
633+
let chains = EventProcessing.computeChainsState(state.chainManager.chainFetchers)
634+
let allChainsReady = chains->Js.Dict.values->Array.every(chainInfo => chainInfo.isReady)
635+
636+
let shouldExit = allChainsReady
635637
? {
636638
// state.config.persistence.storage
637639
Logging.info("All chains are caught up to end blocks.")

scenarios/test_codegen/pnpm-lock.yaml

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

scenarios/test_codegen/src/EventHandlers.res

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -141,16 +141,20 @@ Handlers.Gravatar.TestEvent.handler(async _ => {
141141
()
142142
})
143143

144-
// Test eventOrigin accessibility - exposed for testing
145-
let lastEmptyEventOrigin: ref<option<Internal.eventOrigin>> = ref(None)
144+
// Test chains accessibility - exposed for testing
145+
// Instead of a single eventOrigin enum, we store the entire chains dict
146+
let lastEmptyEventChains: ref<option<Internal.chains>> = ref(None)
146147

147148
Handlers.Gravatar.EmptyEvent.handler(async ({context}) => {
148-
// This handler tests that eventOrigin is accessible in the context
149-
// It will be Historical during sync and Live during live indexing
150-
lastEmptyEventOrigin := Some(context.eventOrigin)
151-
// Log it so we can verify it's working
152-
switch context.eventOrigin {
153-
| Historical => context.log.debug("Processing historical event")
154-
| Live => context.log.debug("Processing live event")
155-
}
149+
// This handler tests that chains state is accessible in the context
150+
// Chains will have isReady: false during sync and isReady: true during live indexing
151+
lastEmptyEventChains := Some(context.chains)
152+
153+
// Log chain states for verification
154+
context.chains
155+
->Js.Dict.entries
156+
->Belt.Array.forEach(((chainId, chainInfo)) => {
157+
let status = chainInfo.isReady ? "ready (live)" : "syncing (historical)"
158+
context.log.debug(`Chain ${chainId} status: ${status}`)
159+
})
156160
})

0 commit comments

Comments
 (0)