Skip to content

Commit ea41c42

Browse files
authored
Prevent invalid rollback with the depth 0 (#544)
* Always update hashes state * Prevent rollback with depth 0 * Better reorg guard naming and fix tests * Fix merge conflict error
1 parent 540f85a commit ea41c42

File tree

14 files changed

+483
-294
lines changed

14 files changed

+483
-294
lines changed

codegenerator/cli/npm/envio/src/FetchState.res

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -940,14 +940,14 @@ let queueItemBlockNumber = (queueItem: queueItem) => {
940940
let queueItemIsInReorgThreshold = (
941941
queueItem: queueItem,
942942
~currentBlockHeight,
943-
~heighestBlockBelowThreshold,
943+
~highestBlockBelowThreshold,
944944
) => {
945945
if currentBlockHeight === 0 {
946946
false
947947
} else {
948948
switch queueItem {
949-
| Item(_) => queueItem->queueItemBlockNumber > heighestBlockBelowThreshold
950-
| NoItem(_) => queueItem->queueItemBlockNumber > heighestBlockBelowThreshold
949+
| Item(_) => queueItem->queueItemBlockNumber > highestBlockBelowThreshold
950+
| NoItem(_) => queueItem->queueItemBlockNumber > highestBlockBelowThreshold
951951
}
952952
}
953953
}

codegenerator/cli/npm/envio/src/ReorgDetection.res

Lines changed: 104 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ type blockData = {
1515
external generalizeBlockDataWithTimestamp: blockDataWithTimestamp => blockData = "%identity"
1616

1717
type reorgGuard = {
18-
lastBlockScannedData: blockData,
19-
firstBlockParentNumberAndHash: option<blockData>,
18+
rangeLastBlock: blockData,
19+
prevRangeLastBlock: option<blockData>,
2020
}
2121

2222
type reorgDetected = {
@@ -36,22 +36,31 @@ let reorgDetectedToLogParams = (reorgDetected: reorgDetected, ~shouldRollbackOnR
3636
}
3737
}
3838

39+
type reorgResult = NoReorg | ReorgDetected(reorgDetected)
40+
type validBlockError = NotFound | AlreadyReorgedHashes
41+
type validBlockResult = result<blockDataWithTimestamp, validBlockError>
42+
3943
module LastBlockScannedHashes: {
4044
type t
4145
/**Instantiat t with existing data*/
42-
let makeWithData: (array<blockData>, ~confirmedBlockThreshold: int) => t
46+
let makeWithData: (
47+
array<blockData>,
48+
~confirmedBlockThreshold: int,
49+
~detectedReorgBlock: blockData=?,
50+
) => t
4351

4452
/**Instantiat empty t with no block data*/
4553
let empty: (~confirmedBlockThreshold: int) => t
4654

47-
/** Registers a new reorg guard, prunes unnened data and returns the updated data
48-
or an error if a reorg has occured
49-
*/
55+
/** Registers a new reorg guard, prunes unneeded data, and returns the updated state.
56+
* Resets internal state if shouldRollbackOnReorg is false (detect-only mode)
57+
*/
5058
let registerReorgGuard: (
5159
t,
5260
~reorgGuard: reorgGuard,
5361
~currentBlockHeight: int,
54-
) => result<t, reorgDetected>
62+
~shouldRollbackOnReorg: bool,
63+
) => (t, reorgResult)
5564

5665
/**
5766
Returns the latest block data which matches block number and hashes in the provided array
@@ -61,7 +70,7 @@ module LastBlockScannedHashes: {
6170
t,
6271
~blockNumbersAndHashes: array<blockDataWithTimestamp>,
6372
~currentBlockHeight: int,
64-
) => option<blockDataWithTimestamp>
73+
) => validBlockResult
6574

6675
let getThresholdBlockNumbers: (t, ~currentBlockHeight: int) => array<int>
6776

@@ -76,9 +85,14 @@ module LastBlockScannedHashes: {
7685
// A hash map of recent blockdata by block number to make comparison checks
7786
// for reorgs.
7887
dataByBlockNumber: dict<blockData>,
88+
// The latest block which detected a reorg
89+
// and should never be valid.
90+
// We keep track of this to avoid responses
91+
// with the stale data from other data-source instances.
92+
detectedReorgBlock: option<blockData>,
7993
}
8094

81-
let makeWithData = (blocks, ~confirmedBlockThreshold) => {
95+
let makeWithData = (blocks, ~confirmedBlockThreshold, ~detectedReorgBlock=?) => {
8296
let dataByBlockNumber = Js.Dict.empty()
8397

8498
blocks->Belt.Array.forEach(block => {
@@ -88,12 +102,14 @@ module LastBlockScannedHashes: {
88102
{
89103
confirmedBlockThreshold,
90104
dataByBlockNumber,
105+
detectedReorgBlock,
91106
}
92107
}
93108
//Instantiates empty LastBlockHashes
94109
let empty = (~confirmedBlockThreshold) => {
95110
confirmedBlockThreshold,
96111
dataByBlockNumber: Js.Dict.empty(),
112+
detectedReorgBlock: None,
97113
}
98114

99115
let getDataByBlockNumberCopyInThreshold = (
@@ -122,33 +138,33 @@ module LastBlockScannedHashes: {
122138
{confirmedBlockThreshold} as self: t,
123139
~reorgGuard: reorgGuard,
124140
~currentBlockHeight,
141+
~shouldRollbackOnReorg,
125142
) => {
126143
let dataByBlockNumberCopyInThreshold =
127144
self->getDataByBlockNumberCopyInThreshold(~currentBlockHeight)
128145

129-
let {lastBlockScannedData, firstBlockParentNumberAndHash} = reorgGuard
146+
let {rangeLastBlock, prevRangeLastBlock} = reorgGuard
130147

131148
let maybeReorgDetected = switch dataByBlockNumberCopyInThreshold->Utils.Dict.dangerouslyGetNonOption(
132-
lastBlockScannedData.blockNumber->Int.toString,
149+
rangeLastBlock.blockNumber->Int.toString,
133150
) {
134-
| Some(scannedBlock) if scannedBlock.blockHash !== lastBlockScannedData.blockHash =>
151+
| Some(scannedBlock) if scannedBlock.blockHash !== rangeLastBlock.blockHash =>
135152
Some({
136-
receivedBlock: lastBlockScannedData,
153+
receivedBlock: rangeLastBlock,
137154
scannedBlock,
138155
})
139156
| _ =>
140-
switch firstBlockParentNumberAndHash {
157+
switch prevRangeLastBlock {
141158
//If parentHash is None, then it's the genesis block (no reorg)
142159
//Need to check that parentHash matches because of the dynamic contracts
143160
| None => None
144-
| Some(firstBlockParentNumberAndHash) =>
161+
| Some(prevRangeLastBlock) =>
145162
switch dataByBlockNumberCopyInThreshold->Utils.Dict.dangerouslyGetNonOption(
146-
firstBlockParentNumberAndHash.blockNumber->Int.toString,
163+
prevRangeLastBlock.blockNumber->Int.toString,
147164
) {
148-
| Some(scannedBlock)
149-
if scannedBlock.blockHash !== firstBlockParentNumberAndHash.blockHash =>
165+
| Some(scannedBlock) if scannedBlock.blockHash !== prevRangeLastBlock.blockHash =>
150166
Some({
151-
receivedBlock: firstBlockParentNumberAndHash,
167+
receivedBlock: prevRangeLastBlock,
152168
scannedBlock,
153169
})
154170
| _ => None
@@ -157,25 +173,37 @@ module LastBlockScannedHashes: {
157173
}
158174

159175
switch maybeReorgDetected {
160-
| Some(reorgDetected) => Error(reorgDetected)
176+
| Some(reorgDetected) => (
177+
shouldRollbackOnReorg
178+
? {
179+
...self,
180+
detectedReorgBlock: Some(reorgDetected.scannedBlock),
181+
}
182+
: empty(~confirmedBlockThreshold),
183+
ReorgDetected(reorgDetected),
184+
)
161185
| None => {
162186
dataByBlockNumberCopyInThreshold->Js.Dict.set(
163-
lastBlockScannedData.blockNumber->Int.toString,
164-
lastBlockScannedData,
187+
rangeLastBlock.blockNumber->Int.toString,
188+
rangeLastBlock,
165189
)
166-
switch firstBlockParentNumberAndHash {
190+
switch prevRangeLastBlock {
167191
| None => ()
168-
| Some(firstBlockParentNumberAndHash) =>
192+
| Some(prevRangeLastBlock) =>
169193
dataByBlockNumberCopyInThreshold->Js.Dict.set(
170-
firstBlockParentNumberAndHash.blockNumber->Int.toString,
171-
firstBlockParentNumberAndHash,
194+
prevRangeLastBlock.blockNumber->Int.toString,
195+
prevRangeLastBlock,
172196
)
173197
}
174198

175-
Ok({
176-
confirmedBlockThreshold,
177-
dataByBlockNumber: dataByBlockNumberCopyInThreshold,
178-
})
199+
(
200+
{
201+
confirmedBlockThreshold,
202+
dataByBlockNumber: dataByBlockNumberCopyInThreshold,
203+
detectedReorgBlock: None,
204+
},
205+
NoReorg,
206+
)
179207
}
180208
}
181209
}
@@ -186,39 +214,58 @@ module LastBlockScannedHashes: {
186214
~currentBlockHeight,
187215
) => {
188216
let verifiedDataByBlockNumber = Js.Dict.empty()
189-
blockNumbersAndHashes->Array.forEach(blockData => {
217+
for idx in 0 to blockNumbersAndHashes->Array.length - 1 {
218+
let blockData = blockNumbersAndHashes->Array.getUnsafe(idx)
190219
verifiedDataByBlockNumber->Js.Dict.set(blockData.blockNumber->Int.toString, blockData)
191-
})
220+
}
192221

193-
let dataByBlockNumber = self->getDataByBlockNumberCopyInThreshold(~currentBlockHeight)
194-
// Js engine automatically orders numeric object keys
195-
let ascBlockNumberKeys = dataByBlockNumber->Js.Dict.keys
222+
let isAlreadyReorgedResponse = switch self.detectedReorgBlock {
223+
| Some(detectedReorgBlock) =>
224+
switch verifiedDataByBlockNumber->Utils.Dict.dangerouslyGetNonOption(
225+
detectedReorgBlock.blockNumber->Int.toString,
226+
) {
227+
| Some(verifiedBlockData) => verifiedBlockData.blockHash === detectedReorgBlock.blockHash
228+
| None => false
229+
}
230+
| None => false
231+
}
196232

197-
let getPrevScannedBlock = idx =>
198-
ascBlockNumberKeys
199-
->Belt.Array.get(idx - 1)
200-
->Option.flatMap(key => {
201-
// We should already validate that the block number is verified at the point
202-
verifiedDataByBlockNumber->Utils.Dict.dangerouslyGetNonOption(key)
203-
})
233+
if isAlreadyReorgedResponse {
234+
Error(AlreadyReorgedHashes)
235+
} else {
236+
let dataByBlockNumber = self->getDataByBlockNumberCopyInThreshold(~currentBlockHeight)
237+
// Js engine automatically orders numeric object keys
238+
let ascBlockNumberKeys = dataByBlockNumber->Js.Dict.keys
239+
240+
let getPrevScannedBlock = idx =>
241+
switch ascBlockNumberKeys
242+
->Belt.Array.get(idx - 1)
243+
->Option.flatMap(key => {
244+
// We should already validate that the block number is verified at the point
245+
verifiedDataByBlockNumber->Utils.Dict.dangerouslyGetNonOption(key)
246+
}) {
247+
| Some(data) => Ok(data)
248+
| None => Error(NotFound)
249+
}
204250

205-
let rec loop = idx => {
206-
switch ascBlockNumberKeys->Belt.Array.get(idx) {
207-
| Some(blockNumberKey) =>
208-
let scannedBlock = dataByBlockNumber->Js.Dict.unsafeGet(blockNumberKey)
209-
switch verifiedDataByBlockNumber->Utils.Dict.dangerouslyGetNonOption(blockNumberKey) {
210-
| None =>
211-
Js.Exn.raiseError(
212-
`Unexpected case. Couldn't find verified hash for block number ${blockNumberKey}`,
213-
)
214-
| Some(verifiedBlockData) if verifiedBlockData.blockHash === scannedBlock.blockHash =>
215-
loop(idx + 1)
216-
| Some(_) => getPrevScannedBlock(idx)
251+
let rec loop = idx => {
252+
switch ascBlockNumberKeys->Belt.Array.get(idx) {
253+
| Some(blockNumberKey) =>
254+
let scannedBlock = dataByBlockNumber->Js.Dict.unsafeGet(blockNumberKey)
255+
switch verifiedDataByBlockNumber->Utils.Dict.dangerouslyGetNonOption(blockNumberKey) {
256+
| None =>
257+
Js.Exn.raiseError(
258+
`Unexpected case. Couldn't find verified hash for block number ${blockNumberKey}`,
259+
)
260+
| Some(verifiedBlockData) if verifiedBlockData.blockHash === scannedBlock.blockHash =>
261+
loop(idx + 1)
262+
| Some(_) => getPrevScannedBlock(idx)
263+
}
264+
| None => getPrevScannedBlock(idx)
217265
}
218-
| None => getPrevScannedBlock(idx)
219266
}
267+
loop(0)
220268
}
221-
loop(0)
222269
}
223270

224271
/**
@@ -254,6 +301,7 @@ module LastBlockScannedHashes: {
254301
{
255302
confirmedBlockThreshold,
256303
dataByBlockNumber: newDataByBlockNumber,
304+
detectedReorgBlock: None,
257305
}
258306
}
259307

codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ let hasNoMoreEventsToProcess = (self: t) => {
443443
self.fetchState->FetchState.queueSize === 0
444444
}
445445

446-
let getHeighestBlockBelowThreshold = (cf: t): int => {
446+
let getHighestBlockBelowThreshold = (cf: t): int => {
447447
let highestBlockBelowThreshold = cf.currentBlockHeight - cf.chainConfig.confirmedBlockThreshold
448448
highestBlockBelowThreshold < 0 ? 0 : highestBlockBelowThreshold
449449
}
@@ -475,7 +475,7 @@ let getLastKnownValidBlock = async (
475475
}
476476

477477
let fallback = async () => {
478-
switch await getBlockHashes([chainFetcher->getHeighestBlockBelowThreshold]) {
478+
switch await getBlockHashes([chainFetcher->getHighestBlockBelowThreshold]) {
479479
| [block] => block
480480
| _ =>
481481
Js.Exn.raiseError(
@@ -487,15 +487,27 @@ let getLastKnownValidBlock = async (
487487
switch scannedBlockNumbers {
488488
| [] => await fallback()
489489
| _ => {
490-
let blockNumbersAndHashes = await getBlockHashes(scannedBlockNumbers)
491-
492-
switch chainFetcher.lastBlockScannedHashes->ReorgDetection.LastBlockScannedHashes.getLatestValidScannedBlock(
493-
~blockNumbersAndHashes,
494-
~currentBlockHeight=chainFetcher.currentBlockHeight,
495-
) {
496-
| Some(block) => block
497-
| None => await fallback()
490+
let blockRef = ref(None)
491+
492+
while blockRef.contents->Option.isNone {
493+
let blockNumbersAndHashes = await getBlockHashes(scannedBlockNumbers)
494+
495+
switch chainFetcher.lastBlockScannedHashes->ReorgDetection.LastBlockScannedHashes.getLatestValidScannedBlock(
496+
~blockNumbersAndHashes,
497+
~currentBlockHeight=chainFetcher.currentBlockHeight,
498+
) {
499+
| Ok(block) => blockRef := Some(block)
500+
| Error(NotFound) => blockRef := Some(await fallback())
501+
| Error(AlreadyReorgedHashes) =>
502+
let delayMilliseconds = 100
503+
chainFetcher.logger->Logging.childTrace(
504+
`Failed to find a valid block to rollback to, since received already reorged hashes from another HyperSync instance. HyperSync has multiple instances and it's possible that they drift independently slightly from the head. Indexing should continue correctly after retrying the query in ${delayMilliseconds->Int.toString}ms.`,
505+
)
506+
await Utils.delay(delayMilliseconds)
507+
}
498508
}
509+
510+
blockRef.contents->Option.getUnsafe
499511
}
500512
}
501513
}

codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ type isInReorgThresholdRes<'payload> = {
6767

6868
type fetchStateWithData = {
6969
fetchState: FetchState.t,
70-
heighestBlockBelowThreshold: int,
70+
highestBlockBelowThreshold: int,
7171
currentBlockHeight: int,
7272
}
7373

@@ -78,7 +78,7 @@ let isQueueItemEarlierUnorderedBelowReorgThreshold = (
7878
let data = fetchStatesMap->ChainMap.get(item.chain)
7979
item.earliestEvent->FetchState.queueItemIsInReorgThreshold(
8080
~currentBlockHeight=data.currentBlockHeight,
81-
~heighestBlockBelowThreshold=data.heighestBlockBelowThreshold,
81+
~highestBlockBelowThreshold=data.highestBlockBelowThreshold,
8282
)
8383
}
8484
// The idea here is if we are in undordered multichain mode, always prioritize queue
@@ -112,7 +112,7 @@ let determineNextEvent = (
112112
->ChainMap.entries
113113
->Array.reduce({isInReorgThreshold: false, val: None}, (
114114
accum,
115-
(chain, {fetchState, currentBlockHeight, heighestBlockBelowThreshold}),
115+
(chain, {fetchState, currentBlockHeight, highestBlockBelowThreshold}),
116116
) => {
117117
// If the fetch state has reached the end block we don't need to consider it
118118
if fetchState->FetchState.isActivelyIndexing {
@@ -124,7 +124,7 @@ let determineNextEvent = (
124124
let isInReorgThreshold =
125125
earliestEvent->FetchState.queueItemIsInReorgThreshold(
126126
~currentBlockHeight,
127-
~heighestBlockBelowThreshold,
127+
~highestBlockBelowThreshold,
128128
)
129129

130130
{
@@ -225,7 +225,7 @@ let getFetchStateWithData = (self: t, ~shouldDeepCopy=false): ChainMap.t<fetchSt
225225
self.chainFetchers->ChainMap.map(cf => {
226226
{
227227
fetchState: shouldDeepCopy ? cf.fetchState->FetchState.copy : cf.fetchState,
228-
heighestBlockBelowThreshold: cf->ChainFetcher.getHeighestBlockBelowThreshold,
228+
highestBlockBelowThreshold: cf->ChainFetcher.getHighestBlockBelowThreshold,
229229
currentBlockHeight: cf.currentBlockHeight,
230230
}
231231
})
@@ -363,7 +363,7 @@ let getSafeChainIdAndBlockNumberArray = (self: t): array<
363363
->ChainMap.values
364364
->Array.map((cf): DbFunctions.EntityHistory.chainIdAndBlockNumber => {
365365
chainId: cf.chainConfig.chain->ChainMap.Chain.toChainId,
366-
blockNumber: cf->ChainFetcher.getHeighestBlockBelowThreshold,
366+
blockNumber: cf->ChainFetcher.getHighestBlockBelowThreshold,
367367
})
368368
}
369369

0 commit comments

Comments
 (0)