-
Notifications
You must be signed in to change notification settings - Fork 28
2x faster historical sync with RPC source #726
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6ff19d2
81ab74b
313325f
10ec0fe
c49df32
8b9a978
fa5fe3b
b6e4468
54f8e25
612c470
7770d60
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,7 +16,13 @@ let getKnownBlock = (provider, blockNumber) => | |
| } | ||
| ) | ||
|
|
||
| let rec getKnownBlockWithBackoff = async (~provider, ~sourceName, ~chain, ~blockNumber, ~backoffMsOnFailure) => | ||
| let rec getKnownBlockWithBackoff = async ( | ||
| ~provider, | ||
| ~sourceName, | ||
| ~chain, | ||
| ~blockNumber, | ||
| ~backoffMsOnFailure, | ||
| ) => | ||
| switch await getKnownBlock(provider, blockNumber) { | ||
| | exception err => | ||
| Logging.warn({ | ||
|
|
@@ -88,19 +94,24 @@ let getSuggestedBlockIntervalFromExn = { | |
| // - Optimism: "backend response too large" or "Block range is too large" | ||
| // - Arbitrum: "logs matched by query exceeds limit of 10000" | ||
|
|
||
| exn => | ||
| (exn): option<( | ||
| // The suggested block range | ||
| int, | ||
| // Whether it's the max range that the provider allows | ||
| bool, | ||
| )> => | ||
| switch exn { | ||
| | Js.Exn.Error(error) => | ||
| try { | ||
| let message: string = (error->Obj.magic)["error"]["message"] | ||
| message->S.assertOrThrow(S.string) | ||
|
|
||
| // Helper to extract block range from regex match | ||
| let extractBlockRange = execResult => | ||
| let extractBlockRange = (execResult, ~isMaxRange) => | ||
| switch execResult->Js.Re.captures { | ||
| | [_, Js.Nullable.Value(blockRangeLimit)] => | ||
| switch blockRangeLimit->Int.fromString { | ||
| | Some(blockRangeLimit) if blockRangeLimit > 0 => Some(blockRangeLimit) | ||
| | Some(blockRangeLimit) if blockRangeLimit > 0 => Some(blockRangeLimit, isMaxRange) | ||
| | _ => None | ||
| } | ||
|
Comment on lines
+110
to
116
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: incorrect Option tuple construction (Some needs a single tuple argument). In ReScript, option<(int, bool)> must be constructed as Some((x, y)), not Some(x, y). Current code won’t type-check and will fail at build time. Apply: - | Some(blockRangeLimit) if blockRangeLimit > 0 => Some(blockRangeLimit, isMaxRange)
+ | Some(blockRangeLimit) if blockRangeLimit > 0 => Some((blockRangeLimit, isMaxRange))
- | (Some(fromBlock), Some(toBlock)) if toBlock >= fromBlock =>
- Some(toBlock - fromBlock + 1, false)
+ | (Some(fromBlock), Some(toBlock)) if toBlock >= fromBlock =>
+ Some((toBlock - fromBlock + 1, false))
- | Some(_) => Some(2000, true)
+ | Some(_) => Some((2000, true))
- | Some(_) => Some(10000, true)
+ | Some(_) => Some((10000, true))Also applies to: 127-128, 153-154, 159-160 🤖 Prompt for AI Agents |
||
| | _ => None | ||
|
|
@@ -113,48 +124,49 @@ let getSuggestedBlockIntervalFromExn = { | |
| | [_, Js.Nullable.Value(fromBlock), Js.Nullable.Value(toBlock)] => | ||
| switch (fromBlock->Int.fromString, toBlock->Int.fromString) { | ||
| | (Some(fromBlock), Some(toBlock)) if toBlock >= fromBlock => | ||
| Some(toBlock - fromBlock + 1) | ||
| Some(toBlock - fromBlock + 1, false) | ||
| | _ => None | ||
| } | ||
| | _ => None | ||
| } | ||
| | None => | ||
| // Try each provider's specific error pattern | ||
| switch blockRangeLimitRegExp->Js.Re.exec_(message) { | ||
| | Some(execResult) => extractBlockRange(execResult) | ||
| | Some(execResult) => extractBlockRange(execResult, ~isMaxRange=true) | ||
| | None => | ||
| switch alchemyRangeRegExp->Js.Re.exec_(message) { | ||
| | Some(execResult) => extractBlockRange(execResult) | ||
| | Some(execResult) => extractBlockRange(execResult, ~isMaxRange=true) | ||
| | None => | ||
| switch cloudflareRangeRegExp->Js.Re.exec_(message) { | ||
| | Some(execResult) => extractBlockRange(execResult) | ||
| | Some(execResult) => extractBlockRange(execResult, ~isMaxRange=true) | ||
| | None => | ||
| switch thirdwebRangeRegExp->Js.Re.exec_(message) { | ||
| | Some(execResult) => extractBlockRange(execResult) | ||
| | Some(execResult) => extractBlockRange(execResult, ~isMaxRange=true) | ||
| | None => | ||
| switch blockpiRangeRegExp->Js.Re.exec_(message) { | ||
| | Some(execResult) => extractBlockRange(execResult) | ||
| | Some(execResult) => extractBlockRange(execResult, ~isMaxRange=true) | ||
| | None => | ||
| switch maxAllowedBlocksRegExp->Js.Re.exec_(message) { | ||
| | Some(execResult) => extractBlockRange(execResult) | ||
| | Some(execResult) => extractBlockRange(execResult, ~isMaxRange=true) | ||
| | None => | ||
| switch baseRangeRegExp->Js.Re.exec_(message) { | ||
| | Some(_) => Some(2000) | ||
| | Some(_) => Some(2000, true) | ||
| | None => | ||
| switch blastPaidRegExp->Js.Re.exec_(message) { | ||
| | Some(execResult) => extractBlockRange(execResult) | ||
| switch blastPaidRegExp->Js.Re.exec_(message) { | ||
| | Some(execResult) => extractBlockRange(execResult, ~isMaxRange=true) | ||
| | None => | ||
| switch chainstackRegExp->Js.Re.exec_(message) { | ||
| | Some(_) => Some(10000) | ||
| | Some(_) => Some(10000, true) | ||
| | None => | ||
| switch coinbaseRegExp->Js.Re.exec_(message) { | ||
| | Some(execResult) => extractBlockRange(execResult) | ||
| | Some(execResult) => extractBlockRange(execResult, ~isMaxRange=true) | ||
| | None => | ||
| switch publicNodeRegExp->Js.Re.exec_(message) { | ||
| | Some(execResult) => extractBlockRange(execResult) | ||
| | Some(execResult) => extractBlockRange(execResult, ~isMaxRange=true) | ||
| | None => | ||
| switch hyperliquidRegExp->Js.Re.exec_(message) { | ||
| | Some(execResult) => extractBlockRange(execResult) | ||
| | Some(execResult) => | ||
| extractBlockRange(execResult, ~isMaxRange=true) | ||
| | None => None | ||
| } | ||
| } | ||
|
|
@@ -181,15 +193,17 @@ type eventBatchQuery = { | |
| latestFetchedBlock: Ethers.JsonRpcProvider.block, | ||
| } | ||
|
|
||
| let maxSuggestedBlockIntervalKey = "max" | ||
|
|
||
| let getNextPage = ( | ||
| ~fromBlock, | ||
| ~toBlock, | ||
| ~addresses, | ||
| ~topicQuery, | ||
| ~loadBlock, | ||
| ~syncConfig as sc: Config.syncConfig, | ||
| ~syncConfig as sc: InternalConfig.sourceSync, | ||
| ~provider, | ||
| ~suggestedBlockIntervals, | ||
| ~mutSuggestedBlockIntervals, | ||
| ~partitionId, | ||
| ): promise<eventBatchQuery> => { | ||
| //If the query hangs for longer than this, reject this promise to reduce the block interval | ||
|
|
@@ -224,8 +238,11 @@ let getNextPage = ( | |
| ->Promise.race | ||
| ->Promise.catch(err => { | ||
| switch getSuggestedBlockIntervalFromExn(err) { | ||
| | Some(nextBlockIntervalTry) => | ||
| suggestedBlockIntervals->Js.Dict.set(partitionId, nextBlockIntervalTry) | ||
| | Some((nextBlockIntervalTry, isMaxRange)) => | ||
| mutSuggestedBlockIntervals->Js.Dict.set( | ||
| isMaxRange ? maxSuggestedBlockIntervalKey : partitionId, | ||
| nextBlockIntervalTry, | ||
| ) | ||
| raise( | ||
| Source.GetItemsError( | ||
| FailedGettingItems({ | ||
|
|
@@ -241,7 +258,7 @@ let getNextPage = ( | |
| let executedBlockInterval = toBlock - fromBlock + 1 | ||
| let nextBlockIntervalTry = | ||
| (executedBlockInterval->Belt.Int.toFloat *. sc.backoffMultiplicative)->Belt.Int.fromFloat | ||
| suggestedBlockIntervals->Js.Dict.set(partitionId, nextBlockIntervalTry) | ||
| mutSuggestedBlockIntervals->Js.Dict.set(partitionId, nextBlockIntervalTry) | ||
| raise( | ||
| Source.GetItemsError( | ||
| Source.FailedGettingItems({ | ||
|
|
@@ -351,11 +368,8 @@ let memoGetSelectionConfig = (~chain) => { | |
| } | ||
|
|
||
| let makeThrowingGetEventBlock = (~getBlock) => { | ||
| // The block fields type is a subset of Ethers.JsonRpcProvider.block so we can safely cast | ||
| let blockFieldsFromBlock: Ethers.JsonRpcProvider.block => Internal.eventBlock = Utils.magic | ||
|
|
||
| async (log: Ethers.log): Internal.eventBlock => { | ||
| (await getBlock(log.blockNumber))->blockFieldsFromBlock | ||
| async (log: Ethers.log) => { | ||
| await getBlock(log.blockNumber) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -434,7 +448,7 @@ let sanitizeUrl = (url: string) => { | |
|
|
||
| type options = { | ||
| sourceFor: Source.sourceFor, | ||
| syncConfig: Config.syncConfig, | ||
| syncConfig: InternalConfig.sourceSync, | ||
| url: string, | ||
| chain: ChainMap.Chain.t, | ||
| contracts: array<Internal.evmContractConfig>, | ||
|
|
@@ -455,7 +469,7 @@ let make = ({sourceFor, syncConfig, url, chain, contracts, eventRouter}: options | |
|
|
||
| let getSelectionConfig = memoGetSelectionConfig(~chain) | ||
|
|
||
| let suggestedBlockIntervals = Js.Dict.empty() | ||
| let mutSuggestedBlockIntervals = Js.Dict.empty() | ||
|
|
||
| let transactionLoader = LazyLoader.make( | ||
| ~loaderFn=transactionHash => provider->Ethers.JsonRpcProvider.getTransaction(~transactionHash), | ||
|
|
@@ -478,7 +492,13 @@ let make = ({sourceFor, syncConfig, url, chain, contracts, eventRouter}: options | |
|
|
||
| let blockLoader = LazyLoader.make( | ||
| ~loaderFn=blockNumber => | ||
| getKnownBlockWithBackoff(~provider, ~sourceName=name, ~chain, ~backoffMsOnFailure=1000, ~blockNumber), | ||
| getKnownBlockWithBackoff( | ||
| ~provider, | ||
| ~sourceName=name, | ||
| ~chain, | ||
| ~backoffMsOnFailure=1000, | ||
| ~blockNumber, | ||
| ), | ||
| ~onError=(am, ~exn) => { | ||
| Logging.error({ | ||
| "err": exn, | ||
|
|
@@ -523,10 +543,15 @@ let make = ({sourceFor, syncConfig, url, chain, contracts, eventRouter}: options | |
| ) => { | ||
| let startFetchingBatchTimeRef = Hrtime.makeTimer() | ||
|
|
||
| let suggestedBlockInterval = | ||
| suggestedBlockIntervals | ||
| let suggestedBlockInterval = switch mutSuggestedBlockIntervals->Utils.Dict.dangerouslyGetNonOption( | ||
| maxSuggestedBlockIntervalKey, | ||
| ) { | ||
| | Some(maxSuggestedBlockInterval) => maxSuggestedBlockInterval | ||
| | None => | ||
| mutSuggestedBlockIntervals | ||
| ->Utils.Dict.dangerouslyGetNonOption(partitionId) | ||
| ->Belt.Option.getWithDefault(syncConfig.initialBlockInterval) | ||
| } | ||
|
|
||
| // Always have a toBlock for an RPC worker | ||
| let toBlock = switch toBlock { | ||
|
|
@@ -554,18 +579,22 @@ let make = ({sourceFor, syncConfig, url, chain, contracts, eventRouter}: options | |
| ~loadBlock=blockNumber => blockLoader->LazyLoader.get(blockNumber), | ||
| ~syncConfig, | ||
| ~provider, | ||
| ~suggestedBlockIntervals, | ||
| ~mutSuggestedBlockIntervals, | ||
| ~partitionId, | ||
| ) | ||
|
|
||
| let executedBlockInterval = suggestedToBlock - fromBlock + 1 | ||
|
|
||
| // Increase the suggested block interval only when it was actually applied | ||
| // and we didn't query to a hard toBlock | ||
| if executedBlockInterval >= suggestedBlockInterval { | ||
| // We also don't care about it when we have a hard max block interval | ||
| if ( | ||
| executedBlockInterval >= suggestedBlockInterval && | ||
| !(mutSuggestedBlockIntervals->Utils.Dict.has(maxSuggestedBlockIntervalKey)) | ||
| ) { | ||
| // Increase batch size going forward, but do not increase past a configured maximum | ||
| // See: https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease | ||
| suggestedBlockIntervals->Js.Dict.set( | ||
| mutSuggestedBlockIntervals->Js.Dict.set( | ||
| partitionId, | ||
| Pervasives.min( | ||
| executedBlockInterval + syncConfig.accelerationAdditive, | ||
|
|
@@ -634,15 +663,19 @@ let make = ({sourceFor, syncConfig, url, chain, contracts, eventRouter}: options | |
| ( | ||
| { | ||
| eventConfig: (eventConfig :> Internal.eventConfig), | ||
| timestamp: block->Types.Block.getTimestamp, | ||
| timestamp: block.timestamp, | ||
| blockNumber: block.number, | ||
| chain, | ||
| blockNumber: block->Types.Block.getNumber, | ||
| logIndex: log.logIndex, | ||
| event: { | ||
| chainId: chain->ChainMap.Chain.toChainId, | ||
| params: decodedEvent.args, | ||
| transaction, | ||
| block, | ||
| // Unreliably expect that the Ethers block fields match the types in HyperIndex | ||
| // I assume this is wrong in some cases, so we need to fix it in the future | ||
| block: block->( | ||
| Utils.magic: Ethers.JsonRpcProvider.block => Internal.eventBlock | ||
| ), | ||
| srcAddress: log.address, | ||
| logIndex: log.logIndex, | ||
| }->Internal.fromGenericEvent, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,16 +12,6 @@ type syncConfigOptions = { | |
| fallbackStallTimeout?: int, | ||
| } | ||
|
|
||
| type syncConfig = { | ||
| initialBlockInterval: int, | ||
| backoffMultiplicative: float, | ||
| accelerationAdditive: int, | ||
| intervalCeiling: int, | ||
| backoffMillis: int, | ||
| queryTimeoutMillis: int, | ||
| fallbackStallTimeout: int, | ||
| } | ||
|
|
||
| type historyFlag = FullHistory | MinHistory | ||
| type rollbackFlag = RollbackOnReorg | NoRollback | ||
| type historyConfig = {rollbackFlag: rollbackFlag, historyFlag: historyFlag} | ||
|
|
@@ -36,7 +26,7 @@ let getSyncConfig = ( | |
| ?queryTimeoutMillis, | ||
| ?fallbackStallTimeout, | ||
| }: syncConfigOptions, | ||
| ): syncConfig => { | ||
| ): InternalConfig.sourceSync => { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Verification agent 🧩 Analysis chainReturn type switch to InternalConfig.sourceSync looks good; verify template/call-site alignment. This aligns the template with the new public type. Please double-check that all consumers (templates and runtime) now expect InternalConfig.sourceSync and that no references to Config.syncConfig remain. Run to confirm no stale references: 🏁 Script executed: #!/bin/bash
rg -nP 'Config\.syncConfig\b|:?\s*syncConfig\b'
rg -nP 'InternalConfig\.sourceSync\b'Length of output: 6790 Stale Config.syncConfig references found—update to InternalConfig.sourceSync
Ensure every call-site uses 🤖 Prompt for AI Agents |
||
| let queryTimeoutMillis = queryTimeoutMillis->Option.getWithDefault(20_000) | ||
| { | ||
| initialBlockInterval: Env.Configurable.SyncConfig.initialBlockInterval->Option.getWithDefault( | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,41 @@ | ||||||||
| open RescriptMocha | ||||||||
|
|
||||||||
| let testApiToken = "3dc856dd-b0ea-494f-b27e-017b8b6b7e07" | ||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hardcoded API token committed — remove and rotate immediately. Never commit secrets, even in skipped tests. Replace with an env-driven value and rotate the exposed token. Apply: - let testApiToken = "3dc856dd-b0ea-494f-b27e-017b8b6b7e07"
+ // Read from your CI/test runner env and document the requirement.
+ let testApiToken = "<SET_HYPERSYNC_API_TOKEN_IN_ENV>"Optionally, gate the test at runtime when the token is missing. 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||
|
|
||||||||
| describe_skip("Test Hyperliquid broken transaction response", () => { | ||||||||
| Async.it("should handle broken transaction response", async () => { | ||||||||
| let page = await HyperSync.GetLogs.query( | ||||||||
| ~client=HyperSyncClient.make( | ||||||||
| ~url="https://645749.hypersync.xyz", | ||||||||
| ~apiToken=testApiToken, | ||||||||
| ~maxNumRetries=Env.hyperSyncClientMaxRetries, | ||||||||
| ~httpReqTimeoutMillis=Env.hyperSyncClientTimeoutMillis, | ||||||||
| ), | ||||||||
| ~fromBlock=12403138, | ||||||||
| ~toBlock=Some(12403139), | ||||||||
| ~logSelections=[ | ||||||||
| { | ||||||||
| addresses: [], | ||||||||
| topicSelections: [ | ||||||||
| { | ||||||||
| topic0: [ | ||||||||
| "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"->EvmTypes.Hex.fromStringUnsafe, | ||||||||
| ], | ||||||||
| topic1: [], | ||||||||
| topic2: [], | ||||||||
| topic3: [], | ||||||||
| }, | ||||||||
| ], | ||||||||
| }, | ||||||||
| ], | ||||||||
| ~fieldSelection={ | ||||||||
| log: [Address, Data, LogIndex, Topic0, Topic1, Topic2, Topic3], | ||||||||
| transaction: [Hash], | ||||||||
| }, | ||||||||
| ~nonOptionalBlockFieldNames=[], | ||||||||
| ~nonOptionalTransactionFieldNames=["hash"], | ||||||||
| ) | ||||||||
|
|
||||||||
| Js.log(page) | ||||||||
| }) | ||||||||
| }) | ||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Always report missing object; avoid truthiness + magic casts.
The non-object detection is gated by fieldNames->notEmpty. If nonOptionalTransactionFieldNames is empty, a missing transaction object won’t be reported—contrary to the PR objective. Also, relying on JS truthiness via Obj.magic is brittle. Replace with explicit JSON classification and hoist the dict cast.
Apply this refactor:
Update call sites accordingly (outside the changed hunk):
🤖 Prompt for AI Agents