Skip to content

Commit d682d59

Browse files
authored
Attempt fallback source when query is not retriable (#799)
1 parent 3a55d07 commit d682d59

File tree

6 files changed

+142
-34
lines changed

6 files changed

+142
-34
lines changed

codegenerator/cli/npm/envio/src/sources/HyperFuelSource.res

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -258,14 +258,10 @@ let make = ({chain, endpointUrl}: options): t => {
258258
backoffMillis,
259259
})
260260
| UnexpectedMissingParams({missingParams}) =>
261-
WithBackoff({
262-
message: `Received page response with invalid data. Attempt a retry. Missing params: ${missingParams->Js.Array2.joinWith(
263-
",",
261+
ImpossibleForTheQuery({
262+
message: `Source returned invalid data with missing required fields: ${missingParams->Js.Array2.joinWith(
263+
", ",
264264
)}`,
265-
backoffMillis: switch retry {
266-
| 0 => 1000
267-
| _ => 4000 * retry
268-
},
269265
})
270266
},
271267
}),

codegenerator/cli/npm/envio/src/sources/HyperSyncSource.res

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -288,14 +288,10 @@ let make = (
288288
backoffMillis,
289289
})
290290
| UnexpectedMissingParams({missingParams}) =>
291-
WithBackoff({
292-
message: `Received page response with invalid data. Attempt a retry. Missing params: ${missingParams->Js.Array2.joinWith(
293-
",",
291+
ImpossibleForTheQuery({
292+
message: `Source returned invalid data with missing required fields: ${missingParams->Js.Array2.joinWith(
293+
", ",
294294
)}`,
295-
backoffMillis: switch retry {
296-
| 0 => 1000
297-
| _ => 4000 * retry
298-
},
299295
})
300296
},
301297
}),

codegenerator/cli/npm/envio/src/sources/RpcSource.res

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -678,11 +678,12 @@ let make = (
678678
| exn =>
679679
raise(
680680
Source.GetItemsError(
681-
FailedParsingItems({
682-
message: "Failed to parse events using hypersync client decoder. Please double-check your ABI.",
681+
FailedGettingItems({
683682
exn,
684-
blockNumber: fromBlock,
685-
logIndex: 0,
683+
attemptedToBlock: toBlock,
684+
retry: ImpossibleForTheQuery({
685+
message: "Failed to parse events using hypersync client decoder. Please double-check your ABI.",
686+
}),
686687
}),
687688
),
688689
)
@@ -812,11 +813,12 @@ let make = (
812813
| exn =>
813814
raise(
814815
Source.GetItemsError(
815-
FailedParsingItems({
816-
message: "Failed to parse event with viem, please double-check your ABI.",
816+
FailedGettingItems({
817817
exn,
818-
blockNumber,
819-
logIndex,
818+
attemptedToBlock: toBlock,
819+
retry: ImpossibleForTheQuery({
820+
message: `Failed to parse event with viem, please double-check your ABI. Block number: ${blockNumber->Int.toString}, log index: ${logIndex->Int.toString}`,
821+
}),
820822
}),
821823
),
822824
)

codegenerator/cli/npm/envio/src/sources/Source.res

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ type blockRangeFetchResponse = {
2323
type getItemsRetry =
2424
| WithSuggestedToBlock({toBlock: int})
2525
| WithBackoff({message: string, backoffMillis: int})
26+
| ImpossibleForTheQuery({message: string})
2627

2728
type getItemsError =
2829
| UnsupportedSelection({message: string})
2930
| FailedGettingFieldSelection({exn: exn, blockNumber: int, logIndex: int, message: string})
30-
| FailedParsingItems({exn: exn, blockNumber: int, logIndex: int, message: string})
3131
| FailedGettingItems({exn: exn, attemptedToBlock: int, retry: getItemsRetry})
3232

3333
exception GetItemsError(getItemsError)

codegenerator/cli/npm/envio/src/sources/SourceManager.res

Lines changed: 50 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@ let getNextSyncSource = (
304304
sourceManager,
305305
// This is needed to include the Fallback source to rotation
306306
~initialSource,
307+
~currentSource,
307308
// After multiple failures start returning fallback sources as well
308309
// But don't try it when main sync sources fail because of invalid configuration
309310
// note: The logic might be changed in the future
@@ -315,7 +316,7 @@ let getNextSyncSource = (
315316
let hasActive = ref(false)
316317

317318
sourceManager.sources->Utils.Set.forEach(source => {
318-
if source === sourceManager.activeSource {
319+
if source === currentSource {
319320
hasActive := true
320321
} else if (
321322
switch source.sourceFor {
@@ -332,7 +333,7 @@ let getNextSyncSource = (
332333
| None =>
333334
switch before->Array.get(0) {
334335
| Some(s) => s
335-
| None => sourceManager.activeSource
336+
| None => currentSource
336337
}
337338
}
338339
}
@@ -349,9 +350,11 @@ let executeQuery = async (sourceManager: t, ~query: FetchState.query, ~currentBl
349350
let responseRef = ref(None)
350351
let retryRef = ref(0)
351352
let initialSource = sourceManager.activeSource
353+
let sourceRef = ref(initialSource)
354+
let shouldUpdateActiveSource = ref(false)
352355

353356
while responseRef.contents->Option.isNone {
354-
let source = sourceManager.activeSource
357+
let source = sourceRef.contents
355358
let toBlock = toBlockRef.contents
356359
let retry = retryRef.contents
357360

@@ -391,9 +394,8 @@ let executeQuery = async (sourceManager: t, ~query: FetchState.query, ~currentBl
391394
| Source.GetItemsError(error) =>
392395
switch error {
393396
| UnsupportedSelection(_)
394-
| FailedGettingFieldSelection(_)
395-
| FailedParsingItems(_) => {
396-
let nextSource = sourceManager->getNextSyncSource(~initialSource)
397+
| FailedGettingFieldSelection(_) => {
398+
let nextSource = sourceManager->getNextSyncSource(~initialSource, ~currentSource=source)
397399

398400
// These errors are impossible to recover, so we delete the source
399401
// from sourceManager so it's not attempted anymore
@@ -404,8 +406,7 @@ let executeQuery = async (sourceManager: t, ~query: FetchState.query, ~currentBl
404406
if notAlreadyDeleted {
405407
switch error {
406408
| UnsupportedSelection({message}) => logger->Logging.childError(message)
407-
| FailedGettingFieldSelection({exn, message, blockNumber, logIndex})
408-
| FailedParsingItems({exn, message, blockNumber, logIndex}) =>
409+
| FailedGettingFieldSelection({exn, message, blockNumber, logIndex}) =>
409410
logger->Logging.childError({
410411
"msg": message,
411412
"err": exn->Utils.prettifyExn,
@@ -426,7 +427,8 @@ let executeQuery = async (sourceManager: t, ~query: FetchState.query, ~currentBl
426427
"msg": "Switching to another data-source",
427428
"source": nextSource.name,
428429
})
429-
sourceManager.activeSource = nextSource
430+
sourceRef := nextSource
431+
shouldUpdateActiveSource := true
430432
retryRef := 0
431433
}
432434
}
@@ -438,6 +440,33 @@ let executeQuery = async (sourceManager: t, ~query: FetchState.query, ~currentBl
438440
})
439441
toBlockRef := Some(toBlock)
440442
retryRef := 0
443+
| FailedGettingItems({exn, attemptedToBlock, retry: ImpossibleForTheQuery({message})}) =>
444+
let nextSource =
445+
sourceManager->getNextSyncSource(
446+
~initialSource,
447+
~currentSource=source,
448+
~attemptFallbacks=true,
449+
)
450+
451+
let hasAnotherSource = nextSource !== initialSource
452+
453+
logger->Logging.childWarn({
454+
"msg": message ++ (hasAnotherSource ? " - Attempting to another source" : ""),
455+
"toBlock": attemptedToBlock,
456+
"err": exn->Utils.prettifyExn,
457+
})
458+
459+
if !hasAnotherSource {
460+
%raw(`null`)->ErrorHandling.mkLogAndRaise(
461+
~logger,
462+
~msg="The indexer doesn't have data-sources which can continue fetching. Please, check the error logs or reach out to the Envio team.",
463+
)
464+
} else {
465+
sourceRef := nextSource
466+
shouldUpdateActiveSource := false
467+
retryRef := 0
468+
}
469+
441470
| FailedGettingItems({exn, attemptedToBlock, retry: WithBackoff({message, backoffMillis})}) =>
442471
// Starting from the 11th failure (retry=10)
443472
// include fallback sources for switch
@@ -454,7 +483,11 @@ let executeQuery = async (sourceManager: t, ~query: FetchState.query, ~currentBl
454483
| _ =>
455484
// Then try to switch every second failure
456485
if retry->mod(2) === 0 {
457-
sourceManager->getNextSyncSource(~initialSource, ~attemptFallbacks)
486+
sourceManager->getNextSyncSource(
487+
~initialSource,
488+
~attemptFallbacks,
489+
~currentSource=source,
490+
)
458491
} else {
459492
source
460493
}
@@ -476,16 +509,22 @@ let executeQuery = async (sourceManager: t, ~query: FetchState.query, ~currentBl
476509
"msg": "Switching to another data-source",
477510
"source": nextSource.name,
478511
})
479-
sourceManager.activeSource = nextSource
512+
sourceRef := nextSource
513+
shouldUpdateActiveSource := true
480514
} else {
481515
await Utils.delay(Pervasives.min(backoffMillis, 60_000))
482516
}
483517
retryRef := retryRef.contents + 1
484518
}
519+
485520
// TODO: Handle more error cases and hang/retry instead of throwing
486521
| exn => exn->ErrorHandling.mkLogAndRaise(~logger, ~msg="Failed to fetch block Range")
487522
}
488523
}
489524

525+
if shouldUpdateActiveSource.contents {
526+
sourceManager.activeSource = sourceRef.contents
527+
}
528+
490529
responseRef.contents->Option.getUnsafe
491530
}

scenarios/test_codegen/test/E2E_test.res

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,4 +558,79 @@ describe("E2E tests", () => {
558558
~message="Shouldn't increment on invalidation",
559559
)
560560
})
561+
562+
Async.it(
563+
"Should attempt fallback source when primary source fails with missing params",
564+
async () => {
565+
let sourceMockPrimary = Mock.Source.make(
566+
[#getHeightOrThrow, #getItemsOrThrow, #getBlockHashes],
567+
~chain=#1337,
568+
)
569+
let sourceMockFallback = Mock.Source.make(
570+
[#getHeightOrThrow, #getItemsOrThrow, #getBlockHashes],
571+
~chain=#1337,
572+
)
573+
let indexerMock = await Mock.Indexer.make(
574+
~chains=[
575+
{
576+
chain: #1337,
577+
sources: [sourceMockPrimary.source, sourceMockFallback.source],
578+
},
579+
],
580+
)
581+
await Utils.delay(0)
582+
583+
// Resolve initial height request from primary source
584+
Assert.deepEqual(
585+
sourceMockPrimary.getHeightOrThrowCalls->Array.length,
586+
1,
587+
~message="should have called getHeightOrThrow on primary source",
588+
)
589+
sourceMockPrimary.resolveGetHeightOrThrow(300)
590+
await Utils.delay(0)
591+
await Utils.delay(0)
592+
593+
// Primary source should now attempt to fetch items
594+
Assert.deepEqual(
595+
sourceMockPrimary.getItemsOrThrowCalls->Array.length,
596+
1,
597+
~message="should have called getItemsOrThrow on primary source",
598+
)
599+
600+
// Simulate missing params error from HyperSync (converted to InvalidData by the source)
601+
sourceMockPrimary.rejectGetItemsOrThrow(
602+
Source.GetItemsError(
603+
FailedGettingItems({
604+
exn: %raw(`null`),
605+
attemptedToBlock: 100,
606+
retry: ImpossibleForTheQuery({
607+
message: "Source returned invalid data with missing required fields: log.address",
608+
}),
609+
}),
610+
),
611+
)
612+
await Utils.delay(0)
613+
await Utils.delay(0)
614+
615+
// The fallback source should now be called immediately
616+
Assert.deepEqual(
617+
sourceMockFallback.getItemsOrThrowCalls->Array.length,
618+
1,
619+
~message="fallback source should be called after primary fails with invalid data",
620+
)
621+
622+
// Resolve the fallback source successfully
623+
sourceMockFallback.resolveGetItemsOrThrow([], ~latestFetchedBlockNumber=100)
624+
await indexerMock.getBatchWritePromise()
625+
626+
Assert.deepEqual(
627+
(
628+
sourceMockPrimary.getItemsOrThrowCalls->Array.length,
629+
sourceMockFallback.getItemsOrThrowCalls->Array.length,
630+
),
631+
(2, 1),
632+
~message="Shouldn't switch to fallback source for the next query",
633+
)
634+
},
635+
)
561636
})

0 commit comments

Comments
 (0)