From 1770620af19746578b99a996f380f5117bfdd053 Mon Sep 17 00:00:00 2001 From: alvarius Date: Mon, 6 Jan 2025 11:48:44 +0100 Subject: [PATCH] feat(store-sync): update watchLogs rpc response type (#3356) Co-authored-by: Kevin Ingersoll --- .changeset/dirty-tools-hammer.md | 5 +++ packages/store-sync/src/watchLogs.ts | 50 +++++++++++++--------------- 2 files changed, 29 insertions(+), 26 deletions(-) create mode 100644 .changeset/dirty-tools-hammer.md diff --git a/.changeset/dirty-tools-hammer.md b/.changeset/dirty-tools-hammer.md new file mode 100644 index 0000000000..7e8ffc6583 --- /dev/null +++ b/.changeset/dirty-tools-hammer.md @@ -0,0 +1,5 @@ +--- +"@latticexyz/store-sync": patch +--- + +Updated the `watchLogs` util to accept the updated RPC response type. diff --git a/packages/store-sync/src/watchLogs.ts b/packages/store-sync/src/watchLogs.ts index 9b35337a6e..4898feece3 100644 --- a/packages/store-sync/src/watchLogs.ts +++ b/packages/store-sync/src/watchLogs.ts @@ -18,6 +18,11 @@ type WatchLogsResult = { logs$: Observable; }; +type WatchLogsEvent = { + blockNumber: string; + logs: RpcLog[]; +}; + export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLogsResult { const topics = [ storeEventsAbi.flatMap((event) => encodeEventTopics({ abi: [event], eventName: event.name })), @@ -94,8 +99,7 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog client.socket.addEventListener("message", (message) => { const response = JSON.parse(message.data); if ("error" in response) { - debug("was error, returning error to subscriber"); - // Return JSON-RPC errors to the subscriber + debug("JSON-RPC error, returning error to subscriber"); subscriber.error(response.error); return; } @@ -103,15 +107,16 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog // Parse the logs from wiresaw_watchLogs if ("params" in response && response.params.subscription === subscriptionId) { debug("parsing logs"); - const logs: RpcLog[] = response.params.result; - const formattedLogs = logs.map((log) => formatLog(log)); + const result: WatchLogsEvent = response.params.result; + const formattedLogs = result.logs.map((log) => formatLog(log)); const parsedLogs = parseEventLogs({ abi: storeEventsAbi, logs: formattedLogs }); - debug("got logs", parsedLogs); + const blockNumber = BigInt(result.blockNumber); + debug("got logs", parsedLogs, "for pending block", blockNumber); if (caughtUp) { debug("handing off logs to subscriber"); - const blockNumber = parsedLogs[0].blockNumber; subscriber.next({ blockNumber, logs: parsedLogs }); - resumeBlock = blockNumber + 1n; + // Since this the event's block number corresponds to a pending block, we have to refetch this block in case of a restart + resumeBlock = blockNumber; } else { debug("buffering logs"); logBuffer.push(...parsedLogs); @@ -125,11 +130,12 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog debug("fetching initial logs"); const initialLogs = await fetchInitialLogs({ client, address, fromBlock: resumeBlock, topics }); debug("got initial logs", initialLogs); - const logs = [...initialLogs, ...logBuffer].sort(logSort); + const logs = [...initialLogs.logs, ...logBuffer].sort(logSort); debug("combining with log buffer", logs); - const blockNumber = logs.at(-1)?.blockNumber ?? resumeBlock; + const blockNumber = logs.at(-1)?.blockNumber ?? initialLogs.blockNumber; subscriber.next({ blockNumber, logs }); - resumeBlock = blockNumber + 1n; + // Since this the block number can correspond to a pending block, we have to refetch this block in case of a restart + resumeBlock = blockNumber; caughtUp = true; } catch (error) { debug("could not get initial logs", error); @@ -159,7 +165,7 @@ async function fetchInitialLogs({ address, topics, fromBlock, -}: FetchInitialLogsInput): Promise { +}: FetchInitialLogsInput): Promise<{ blockNumber: bigint; logs: StoreEventsLog[] }> { // Fetch latest block number const latestBlockNumber: Hex = ( await client.requestAsync({ @@ -169,25 +175,17 @@ async function fetchInitialLogs({ }) ).result; - const [catchUpLogs, pendingLogs] = await Promise.all([ - // Request all logs from `fromBlock` to the latest block number - client.requestAsync({ + // Request all logs from `fromBlock` to the latest block number + const rawInitialLogs: RpcLog[] = await client + .requestAsync({ body: { method: "eth_getLogs", params: [{ address, topics, fromBlock: toHex(fromBlock), toBlock: latestBlockNumber }], }, - }), - // Request all logs from the current pending block - client.requestAsync({ - body: { - method: "wiresaw_getLogs", - params: [{ address, topics, fromBlock: latestBlockNumber }], - }, - }), - ]); + }) + .then((res) => res.result); // Return all logs from `fromBlock` until the current pending block state as initial result - const rawLogs: RpcLog[] = [...catchUpLogs.result, ...pendingLogs.result]; - const formattedLogs = rawLogs.map((log) => formatLog(log)); - return parseEventLogs({ abi: storeEventsAbi, logs: formattedLogs }); + const formattedLogs = rawInitialLogs.map((log) => formatLog(log)); + return { blockNumber: BigInt(latestBlockNumber), logs: parseEventLogs({ abi: storeEventsAbi, logs: formattedLogs }) }; }