Skip to content

Commit d8846b4

Browse files
authored
Always include all block events to a batch (#753)
* Persist only processed DCs * Add ENVIO_BATCH_SIZE and deprecate MAX_BATCH_SIZE * Update batch creation logic * Asc order for buffer * Remove progressNextBlockLogIndex * Fix batch size env usage
1 parent 718a8f6 commit d8846b4

File tree

16 files changed

+360
-616
lines changed

16 files changed

+360
-616
lines changed

codegenerator/cli/npm/envio/src/Batch.res

Lines changed: 101 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ type progressedChain = {
22
chainId: int,
33
batchSize: int,
44
progressBlockNumber: int,
5-
progressNextBlockLogIndex: option<int>,
65
totalEventsProcessed: int,
76
}
87

@@ -11,137 +10,150 @@ type t = {
1110
progressedChains: array<progressedChain>,
1211
updatedFetchStates: ChainMap.t<FetchState.t>,
1312
dcsToStoreByChainId: dict<array<FetchState.indexingContract>>,
14-
}
15-
16-
type multiChainEventComparitor = {
17-
chain: ChainMap.Chain.t,
18-
earliestEvent: FetchState.queueItem,
19-
}
20-
21-
let getQueueItemComparitor = (earliestQueueItem: FetchState.queueItem, ~chain) => {
22-
switch earliestQueueItem {
23-
| Item({item}) => item->EventUtils.getOrderedBatchItemComparator
24-
| NoItem({latestFetchedBlock: {blockTimestamp, blockNumber}}) => (
25-
blockTimestamp,
26-
chain->ChainMap.Chain.toChainId,
27-
blockNumber,
28-
0,
29-
)
30-
}
31-
}
32-
33-
let isQueueItemEarlier = (a: multiChainEventComparitor, b: multiChainEventComparitor): bool => {
34-
a.earliestEvent->getQueueItemComparitor(~chain=a.chain) <
35-
b.earliestEvent->getQueueItemComparitor(~chain=b.chain)
13+
creationTimeMs: int,
3614
}
3715

3816
/**
3917
It either returnes an earliest item among all chains, or None if no chains are actively indexing
4018
*/
41-
let getOrderedNextItem = (fetchStates: ChainMap.t<FetchState.t>): option<
42-
multiChainEventComparitor,
43-
> => {
44-
fetchStates
45-
->ChainMap.entries
46-
->Belt.Array.reduce(None, (accum, (chain, fetchState)) => {
47-
// If the fetch state has reached the end block we don't need to consider it
19+
let getOrderedNextChain = (fetchStates: ChainMap.t<FetchState.t>, ~batchSizePerChain) => {
20+
let earliestChain: ref<option<FetchState.t>> = ref(None)
21+
let earliestChainTimestamp = ref(0)
22+
let chainKeys = fetchStates->ChainMap.keys
23+
for idx in 0 to chainKeys->Array.length - 1 {
24+
let chain = chainKeys->Array.get(idx)
25+
let fetchState = fetchStates->ChainMap.get(chain)
4826
if fetchState->FetchState.isActivelyIndexing {
49-
let earliestEvent = fetchState->FetchState.getEarliestEvent
50-
let current: multiChainEventComparitor = {chain, earliestEvent}
51-
switch accum {
52-
| Some(previous) if isQueueItemEarlier(previous, current) => accum
53-
| _ => Some(current)
27+
let timestamp = fetchState->FetchState.getTimestampAt(
28+
~index=switch batchSizePerChain->Utils.Dict.dangerouslyGetByIntNonOption(
29+
chain->ChainMap.Chain.toChainId,
30+
) {
31+
| Some(batchSize) => batchSize
32+
| None => 0
33+
},
34+
)
35+
switch earliestChain.contents {
36+
| Some(earliestChain)
37+
if timestamp > earliestChainTimestamp.contents ||
38+
(timestamp === earliestChainTimestamp.contents &&
39+
chain->ChainMap.Chain.toChainId > earliestChain.chainId) => ()
40+
| _ => {
41+
earliestChain := Some(fetchState)
42+
earliestChainTimestamp := timestamp
43+
}
5444
}
55-
} else {
56-
accum
5745
}
58-
})
46+
}
47+
earliestChain.contents
48+
}
49+
50+
// Save overhead of recreating the dict every time
51+
let immutableEmptyBatchSizePerChain: dict<int> = Js.Dict.empty()
52+
let hasOrderedReadyItem = (fetchStates: ChainMap.t<FetchState.t>) => {
53+
switch fetchStates->getOrderedNextChain(~batchSizePerChain=immutableEmptyBatchSizePerChain) {
54+
| Some(fetchState) => fetchState->FetchState.hasReadyItem
55+
| None => false
56+
}
5957
}
6058

61-
let hasUnorderedNextItem = (fetchStates: ChainMap.t<FetchState.t>) => {
59+
let hasUnorderedReadyItem = (fetchStates: ChainMap.t<FetchState.t>) => {
6260
fetchStates
6361
->ChainMap.values
6462
->Js.Array2.some(fetchState => {
65-
fetchState->FetchState.isActivelyIndexing &&
66-
switch fetchState->FetchState.getEarliestEvent {
67-
| Item(_) => true
68-
| NoItem(_) => false
69-
}
63+
fetchState->FetchState.isActivelyIndexing && fetchState->FetchState.hasReadyItem
7064
})
7165
}
7266

73-
let popOrderedBatchItems = (
74-
~maxBatchSize,
67+
let hasMultichainReadyItem = (
68+
fetchStates: ChainMap.t<FetchState.t>,
69+
~multichain: InternalConfig.multichain,
70+
) => {
71+
switch multichain {
72+
| Ordered => hasOrderedReadyItem(fetchStates)
73+
| Unordered => hasUnorderedReadyItem(fetchStates)
74+
}
75+
}
76+
77+
let prepareOrderedBatch = (
78+
~batchSizeTarget,
7579
~fetchStates: ChainMap.t<FetchState.t>,
76-
~sizePerChain: dict<int>,
80+
~mutBatchSizePerChain: dict<int>,
7781
) => {
82+
let batchSize = ref(0)
83+
let isFinished = ref(false)
7884
let items = []
7985

80-
let rec loop = () =>
81-
if items->Array.length < maxBatchSize {
82-
switch fetchStates->getOrderedNextItem {
83-
| Some({earliestEvent, chain}) =>
84-
switch earliestEvent {
85-
| NoItem(_) => ()
86-
| Item({item, popItemOffQueue}) => {
87-
popItemOffQueue()
88-
items->Js.Array2.push(item)->ignore
89-
sizePerChain->Utils.Dict.incrementByInt(chain->ChainMap.Chain.toChainId)
90-
loop()
86+
while batchSize.contents < batchSizeTarget && !isFinished.contents {
87+
switch fetchStates->getOrderedNextChain(~batchSizePerChain=mutBatchSizePerChain) {
88+
| Some(fetchState) => {
89+
let itemsCountBefore = switch mutBatchSizePerChain->Utils.Dict.dangerouslyGetByIntNonOption(
90+
fetchState.chainId,
91+
) {
92+
| Some(batchSize) => batchSize
93+
| None => 0
94+
}
95+
let newItemsCount =
96+
fetchState->FetchState.getReadyItemsCount(
97+
~targetSize=batchSizeTarget - batchSize.contents,
98+
~fromItem=itemsCountBefore,
99+
)
100+
101+
if newItemsCount > 0 {
102+
for idx in itemsCountBefore to itemsCountBefore + newItemsCount - 1 {
103+
items->Js.Array2.push(fetchState.buffer->Belt.Array.getUnsafe(idx))->ignore
91104
}
105+
batchSize := batchSize.contents + newItemsCount
106+
mutBatchSizePerChain->Utils.Dict.setByInt(
107+
fetchState.chainId,
108+
itemsCountBefore + newItemsCount,
109+
)
110+
} else {
111+
isFinished := true
92112
}
93-
| _ => ()
94113
}
114+
115+
| None => isFinished := true
95116
}
96-
loop()
117+
}
97118

98119
items
99120
}
100121

101-
let popUnorderedBatchItems = (
102-
~maxBatchSize,
122+
let prepareUnorderedBatch = (
123+
~batchSizeTarget,
103124
~fetchStates: ChainMap.t<FetchState.t>,
104-
~sizePerChain: dict<int>,
125+
~mutBatchSizePerChain: dict<int>,
105126
) => {
106-
let items = []
107-
108127
let preparedFetchStates =
109128
fetchStates
110129
->ChainMap.values
111-
->FetchState.filterAndSortForUnorderedBatch(~maxBatchSize)
130+
->FetchState.filterAndSortForUnorderedBatch(~batchSizeTarget)
112131

113-
let idx = ref(0)
132+
let chainIdx = ref(0)
114133
let preparedNumber = preparedFetchStates->Array.length
115134
let batchSize = ref(0)
116135

136+
let items = []
137+
117138
// Accumulate items for all actively indexing chains
118139
// the way to group as many items from a single chain as possible
119140
// This way the loaders optimisations will hit more often
120-
while batchSize.contents < maxBatchSize && idx.contents < preparedNumber {
121-
let fetchState = preparedFetchStates->Js.Array2.unsafe_get(idx.contents)
122-
let batchSizeBeforeTheChain = batchSize.contents
123-
124-
let rec loop = () =>
125-
if batchSize.contents < maxBatchSize {
126-
let earliestEvent = fetchState->FetchState.getEarliestEvent
127-
switch earliestEvent {
128-
| NoItem(_) => ()
129-
| Item({item, popItemOffQueue}) => {
130-
popItemOffQueue()
131-
items->Js.Array2.push(item)->ignore
132-
batchSize := batchSize.contents + 1
133-
loop()
134-
}
135-
}
136-
}
137-
loop()
138-
139-
let chainBatchSize = batchSize.contents - batchSizeBeforeTheChain
141+
while batchSize.contents < batchSizeTarget && chainIdx.contents < preparedNumber {
142+
let fetchState = preparedFetchStates->Js.Array2.unsafe_get(chainIdx.contents)
143+
let chainBatchSize =
144+
fetchState->FetchState.getReadyItemsCount(
145+
~targetSize=batchSizeTarget - batchSize.contents,
146+
~fromItem=0,
147+
)
140148
if chainBatchSize > 0 {
141-
sizePerChain->Utils.Dict.setByInt(fetchState.chainId, chainBatchSize)
149+
for idx in 0 to chainBatchSize - 1 {
150+
items->Js.Array2.push(fetchState.buffer->Belt.Array.getUnsafe(idx))->ignore
151+
}
152+
batchSize := batchSize.contents + chainBatchSize
153+
mutBatchSizePerChain->Utils.Dict.setByInt(fetchState.chainId, chainBatchSize)
142154
}
143155

144-
idx := idx.contents + 1
156+
chainIdx := chainIdx.contents + 1
145157
}
146158

147159
items

0 commit comments

Comments
 (0)