Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 101 additions & 89 deletions codegenerator/cli/npm/envio/src/Batch.res
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ type progressedChain = {
chainId: int,
batchSize: int,
progressBlockNumber: int,
progressNextBlockLogIndex: option<int>,
totalEventsProcessed: int,
}

Expand All @@ -11,137 +10,150 @@ type t = {
progressedChains: array<progressedChain>,
updatedFetchStates: ChainMap.t<FetchState.t>,
dcsToStoreByChainId: dict<array<FetchState.indexingContract>>,
}

type multiChainEventComparitor = {
chain: ChainMap.Chain.t,
earliestEvent: FetchState.queueItem,
}

let getQueueItemComparitor = (earliestQueueItem: FetchState.queueItem, ~chain) => {
switch earliestQueueItem {
| Item({item}) => item->EventUtils.getOrderedBatchItemComparator
| NoItem({latestFetchedBlock: {blockTimestamp, blockNumber}}) => (
blockTimestamp,
chain->ChainMap.Chain.toChainId,
blockNumber,
0,
)
}
}

let isQueueItemEarlier = (a: multiChainEventComparitor, b: multiChainEventComparitor): bool => {
a.earliestEvent->getQueueItemComparitor(~chain=a.chain) <
b.earliestEvent->getQueueItemComparitor(~chain=b.chain)
creationTimeMs: int,
}

/**
It either returnes an earliest item among all chains, or None if no chains are actively indexing
*/
let getOrderedNextItem = (fetchStates: ChainMap.t<FetchState.t>): option<
multiChainEventComparitor,
> => {
fetchStates
->ChainMap.entries
->Belt.Array.reduce(None, (accum, (chain, fetchState)) => {
// If the fetch state has reached the end block we don't need to consider it
let getOrderedNextChain = (fetchStates: ChainMap.t<FetchState.t>, ~batchSizePerChain) => {
let earliestChain: ref<option<FetchState.t>> = ref(None)
let earliestChainTimestamp = ref(0)
let chainKeys = fetchStates->ChainMap.keys
for idx in 0 to chainKeys->Array.length - 1 {
let chain = chainKeys->Array.get(idx)
let fetchState = fetchStates->ChainMap.get(chain)
if fetchState->FetchState.isActivelyIndexing {
let earliestEvent = fetchState->FetchState.getEarliestEvent
let current: multiChainEventComparitor = {chain, earliestEvent}
switch accum {
| Some(previous) if isQueueItemEarlier(previous, current) => accum
| _ => Some(current)
let timestamp = fetchState->FetchState.getTimestampAt(
~index=switch batchSizePerChain->Utils.Dict.dangerouslyGetByIntNonOption(
chain->ChainMap.Chain.toChainId,
) {
| Some(batchSize) => batchSize
| None => 0
},
)
switch earliestChain.contents {
| Some(earliestChain)
if timestamp > earliestChainTimestamp.contents ||
(timestamp === earliestChainTimestamp.contents &&
chain->ChainMap.Chain.toChainId > earliestChain.chainId) => ()
| _ => {
earliestChain := Some(fetchState)
earliestChainTimestamp := timestamp
}
}
} else {
accum
}
})
}
earliestChain.contents
}

// Save overhead of recreating the dict every time
let immutableEmptyBatchSizePerChain: dict<int> = Js.Dict.empty()
let hasOrderedReadyItem = (fetchStates: ChainMap.t<FetchState.t>) => {
switch fetchStates->getOrderedNextChain(~batchSizePerChain=immutableEmptyBatchSizePerChain) {
| Some(fetchState) => fetchState->FetchState.hasReadyItem
| None => false
}
}

let hasUnorderedNextItem = (fetchStates: ChainMap.t<FetchState.t>) => {
let hasUnorderedReadyItem = (fetchStates: ChainMap.t<FetchState.t>) => {
fetchStates
->ChainMap.values
->Js.Array2.some(fetchState => {
fetchState->FetchState.isActivelyIndexing &&
switch fetchState->FetchState.getEarliestEvent {
| Item(_) => true
| NoItem(_) => false
}
fetchState->FetchState.isActivelyIndexing && fetchState->FetchState.hasReadyItem
})
}

let popOrderedBatchItems = (
~maxBatchSize,
let hasMultichainReadyItem = (
fetchStates: ChainMap.t<FetchState.t>,
~multichain: InternalConfig.multichain,
) => {
switch multichain {
| Ordered => hasOrderedReadyItem(fetchStates)
| Unordered => hasUnorderedReadyItem(fetchStates)
}
}

let prepareOrderedBatch = (
~batchSizeTarget,
~fetchStates: ChainMap.t<FetchState.t>,
~sizePerChain: dict<int>,
~mutBatchSizePerChain: dict<int>,
) => {
let batchSize = ref(0)
let isFinished = ref(false)
let items = []

let rec loop = () =>
if items->Array.length < maxBatchSize {
switch fetchStates->getOrderedNextItem {
| Some({earliestEvent, chain}) =>
switch earliestEvent {
| NoItem(_) => ()
| Item({item, popItemOffQueue}) => {
popItemOffQueue()
items->Js.Array2.push(item)->ignore
sizePerChain->Utils.Dict.incrementByInt(chain->ChainMap.Chain.toChainId)
loop()
while batchSize.contents < batchSizeTarget && !isFinished.contents {
switch fetchStates->getOrderedNextChain(~batchSizePerChain=mutBatchSizePerChain) {
| Some(fetchState) => {
let itemsCountBefore = switch mutBatchSizePerChain->Utils.Dict.dangerouslyGetByIntNonOption(
fetchState.chainId,
) {
| Some(batchSize) => batchSize
| None => 0
}
let newItemsCount =
fetchState->FetchState.getReadyItemsCount(
~targetSize=batchSizeTarget - batchSize.contents,
~fromItem=itemsCountBefore,
)

if newItemsCount > 0 {
for idx in itemsCountBefore to itemsCountBefore + newItemsCount - 1 {
items->Js.Array2.push(fetchState.buffer->Belt.Array.getUnsafe(idx))->ignore
}
batchSize := batchSize.contents + newItemsCount
mutBatchSizePerChain->Utils.Dict.setByInt(
fetchState.chainId,
itemsCountBefore + newItemsCount,
)
} else {
isFinished := true
}
| _ => ()
}

| None => isFinished := true
}
loop()
}

items
}

let popUnorderedBatchItems = (
~maxBatchSize,
let prepareUnorderedBatch = (
~batchSizeTarget,
~fetchStates: ChainMap.t<FetchState.t>,
~sizePerChain: dict<int>,
~mutBatchSizePerChain: dict<int>,
) => {
let items = []

let preparedFetchStates =
fetchStates
->ChainMap.values
->FetchState.filterAndSortForUnorderedBatch(~maxBatchSize)
->FetchState.filterAndSortForUnorderedBatch(~batchSizeTarget)

let idx = ref(0)
let chainIdx = ref(0)
let preparedNumber = preparedFetchStates->Array.length
let batchSize = ref(0)

let items = []

// Accumulate items for all actively indexing chains
// the way to group as many items from a single chain as possible
// This way the loaders optimisations will hit more often
while batchSize.contents < maxBatchSize && idx.contents < preparedNumber {
let fetchState = preparedFetchStates->Js.Array2.unsafe_get(idx.contents)
let batchSizeBeforeTheChain = batchSize.contents

let rec loop = () =>
if batchSize.contents < maxBatchSize {
let earliestEvent = fetchState->FetchState.getEarliestEvent
switch earliestEvent {
| NoItem(_) => ()
| Item({item, popItemOffQueue}) => {
popItemOffQueue()
items->Js.Array2.push(item)->ignore
batchSize := batchSize.contents + 1
loop()
}
}
}
loop()

let chainBatchSize = batchSize.contents - batchSizeBeforeTheChain
while batchSize.contents < batchSizeTarget && chainIdx.contents < preparedNumber {
let fetchState = preparedFetchStates->Js.Array2.unsafe_get(chainIdx.contents)
let chainBatchSize =
fetchState->FetchState.getReadyItemsCount(
~targetSize=batchSizeTarget - batchSize.contents,
~fromItem=0,
)
if chainBatchSize > 0 {
sizePerChain->Utils.Dict.setByInt(fetchState.chainId, chainBatchSize)
for idx in 0 to chainBatchSize - 1 {
items->Js.Array2.push(fetchState.buffer->Belt.Array.getUnsafe(idx))->ignore
}
batchSize := batchSize.contents + chainBatchSize
mutBatchSizePerChain->Utils.Dict.setByInt(fetchState.chainId, chainBatchSize)
}

idx := idx.contents + 1
chainIdx := chainIdx.contents + 1
}

items
Expand Down
Loading