Skip to content

Commit

Permalink
feat(store-sync): update watchLogs rpc response type (#3356)
Browse files Browse the repository at this point in the history
Co-authored-by: Kevin Ingersoll <kingersoll@gmail.com>
  • Loading branch information
alvrs and holic authored Jan 6, 2025
1 parent 68db8fc commit 1770620
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 26 deletions.
5 changes: 5 additions & 0 deletions .changeset/dirty-tools-hammer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@latticexyz/store-sync": patch
---

Updated the `watchLogs` util to accept the updated RPC response type.
50 changes: 24 additions & 26 deletions packages/store-sync/src/watchLogs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ type WatchLogsResult = {
logs$: Observable<StorageAdapterBlock>;
};

type WatchLogsEvent = {
blockNumber: string;
logs: RpcLog[];
};

export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLogsResult {
const topics = [
storeEventsAbi.flatMap((event) => encodeEventTopics({ abi: [event], eventName: event.name })),
Expand Down Expand Up @@ -94,24 +99,24 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog
client.socket.addEventListener("message", (message) => {
const response = JSON.parse(message.data);
if ("error" in response) {
debug("was error, returning error to subscriber");
// Return JSON-RPC errors to the subscriber
debug("JSON-RPC error, returning error to subscriber");
subscriber.error(response.error);
return;
}

// Parse the logs from wiresaw_watchLogs
if ("params" in response && response.params.subscription === subscriptionId) {
debug("parsing logs");
const logs: RpcLog[] = response.params.result;
const formattedLogs = logs.map((log) => formatLog(log));
const result: WatchLogsEvent = response.params.result;
const formattedLogs = result.logs.map((log) => formatLog(log));
const parsedLogs = parseEventLogs({ abi: storeEventsAbi, logs: formattedLogs });
debug("got logs", parsedLogs);
const blockNumber = BigInt(result.blockNumber);
debug("got logs", parsedLogs, "for pending block", blockNumber);
if (caughtUp) {
debug("handing off logs to subscriber");
const blockNumber = parsedLogs[0].blockNumber;
subscriber.next({ blockNumber, logs: parsedLogs });
resumeBlock = blockNumber + 1n;
// Since this the event's block number corresponds to a pending block, we have to refetch this block in case of a restart
resumeBlock = blockNumber;
} else {
debug("buffering logs");
logBuffer.push(...parsedLogs);
Expand All @@ -125,11 +130,12 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog
debug("fetching initial logs");
const initialLogs = await fetchInitialLogs({ client, address, fromBlock: resumeBlock, topics });
debug("got initial logs", initialLogs);
const logs = [...initialLogs, ...logBuffer].sort(logSort);
const logs = [...initialLogs.logs, ...logBuffer].sort(logSort);
debug("combining with log buffer", logs);
const blockNumber = logs.at(-1)?.blockNumber ?? resumeBlock;
const blockNumber = logs.at(-1)?.blockNumber ?? initialLogs.blockNumber;
subscriber.next({ blockNumber, logs });
resumeBlock = blockNumber + 1n;
// Since this the block number can correspond to a pending block, we have to refetch this block in case of a restart
resumeBlock = blockNumber;
caughtUp = true;
} catch (error) {
debug("could not get initial logs", error);
Expand Down Expand Up @@ -159,7 +165,7 @@ async function fetchInitialLogs({
address,
topics,
fromBlock,
}: FetchInitialLogsInput): Promise<StoreEventsLog[]> {
}: FetchInitialLogsInput): Promise<{ blockNumber: bigint; logs: StoreEventsLog[] }> {
// Fetch latest block number
const latestBlockNumber: Hex = (
await client.requestAsync({
Expand All @@ -169,25 +175,17 @@ async function fetchInitialLogs({
})
).result;

const [catchUpLogs, pendingLogs] = await Promise.all([
// Request all logs from `fromBlock` to the latest block number
client.requestAsync({
// Request all logs from `fromBlock` to the latest block number
const rawInitialLogs: RpcLog[] = await client
.requestAsync({
body: {
method: "eth_getLogs",
params: [{ address, topics, fromBlock: toHex(fromBlock), toBlock: latestBlockNumber }],
},
}),
// Request all logs from the current pending block
client.requestAsync({
body: {
method: "wiresaw_getLogs",
params: [{ address, topics, fromBlock: latestBlockNumber }],
},
}),
]);
})
.then((res) => res.result);

// Return all logs from `fromBlock` until the current pending block state as initial result
const rawLogs: RpcLog[] = [...catchUpLogs.result, ...pendingLogs.result];
const formattedLogs = rawLogs.map((log) => formatLog(log));
return parseEventLogs({ abi: storeEventsAbi, logs: formattedLogs });
const formattedLogs = rawInitialLogs.map((log) => formatLog(log));
return { blockNumber: BigInt(latestBlockNumber), logs: parseEventLogs({ abi: storeEventsAbi, logs: formattedLogs }) };
}

0 comments on commit 1770620

Please sign in to comment.