Skip to content

Commit

Permalink
feat: add chunk snapshot and stream service (#139)
Browse files Browse the repository at this point in the history
* feat: add snapshot load in chunks via a stream

* feat: wip stream service integration

* feat: add tx hash to ecs events

* chore: re-add fetchSnapshotChunked

* chore: increase snapshot port to avoid local port conflict with both services

* feat: stream service integration stubs and fix test

* feat: add WIP stubs for using 'subscribeToStreamLatest'

* refactor(network): move stream service logic into createLatestEventStreamService util

* feat(network): add stream to ecs event transform functions

* chore(network): rename imported function

* docs(network): add docs for stream service transform functions

* chore(docs): fix typo

* test(network): add tests for streaming service logic

* chore: rename latestEventStream to latestEvent$

Co-authored-by: alvrs <alvarius@lattice.xyz>
  • Loading branch information
authcall and alvrs authored Sep 14, 2022
1 parent 29094c4 commit 8c9d4b3
Show file tree
Hide file tree
Showing 18 changed files with 324 additions and 104 deletions.
1 change: 1 addition & 0 deletions packages/network/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ export async function setupContracts<C extends ContractComponents>(world: World,
chainId: config.chainId,
disableCache: false,
checkpointServiceUrl: networkConfig.checkpointServiceUrl,
streamServiceUrl: networkConfig.streamServiceUrl,
});
}

Expand Down
2 changes: 2 additions & 0 deletions packages/network/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export interface NetworkConfig {
clock: ClockConfig;
provider: ProviderConfig;
checkpointServiceUrl?: string;
streamServiceUrl?: string;
initialBlockNumber?: number;
}

Expand Down Expand Up @@ -85,6 +86,7 @@ export type SyncWorkerConfig<Cm extends Components = Components> = {
disableCache?: boolean;
chainId: number;
checkpointServiceUrl?: string;
streamServiceUrl?: string;
};

export enum ContractSchemaValue {
Expand Down
42 changes: 39 additions & 3 deletions packages/network/src/workers/SyncWorker.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { createCacheStore, storeEvent } from "./CacheStore";
import * as syncUtils from "./syncUtils";
import "fake-indexeddb/auto";
import { GodID, SyncState } from "./constants";
import { createLatestEventStreamRPC, createLatestEventStreamService } from "./syncUtils";

// Test constants
const cacheBlockNumber = 99;
Expand Down Expand Up @@ -79,9 +80,10 @@ jest.mock("../createBlockNumberStream", () => ({
jest.mock("./syncUtils", () => ({
...jest.requireActual("./syncUtils"),
createFetchWorldEventsInBlockRange: () => () => Promise.resolve([]),
createLatestEventStream: () => latestEvent$,
createLatestEventStreamRPC: jest.fn(() => latestEvent$),
createLatestEventStreamService: jest.fn(() => latestEvent$),
getSnapshotBlockNumber: () => Promise.resolve(snapshotBlockNumber),
fetchSnapshot: () => {
fetchSnapshotChunked: () => {
const store = createCacheStore();
for (const event of snapshotEvents) storeEvent(store, event);
return store;
Expand Down Expand Up @@ -117,6 +119,7 @@ describe("Sync.worker", () => {

afterEach(() => {
subscription?.unsubscribe();
jest.clearAllMocks();
});

it("should report the current loading state via the `component.LoadingState` component", async () => {
Expand Down Expand Up @@ -154,6 +157,7 @@ describe("Sync.worker", () => {
it("should pass live events to the output", async () => {
input$.next({
checkpointServiceUrl: "",
streamServiceUrl: "",
chainId: 4242,
worldContract: { address: "0x00", abi: [] },
provider: { jsonRpcUrl: "", options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true } },
Expand All @@ -179,9 +183,38 @@ describe("Sync.worker", () => {
expect(output).toHaveBeenCalledWith(event);
});

it("should sync live events from rpc if streaming service is not available", async () => {
input$.next({
checkpointServiceUrl: "",
streamServiceUrl: "",
chainId: 4242,
worldContract: { address: "0x00", abi: [] },
provider: { jsonRpcUrl: "", options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true } },
initialBlockNumber: 0,
});
await sleep(0);
expect(createLatestEventStreamRPC).toHaveBeenCalled();
expect(createLatestEventStreamService).not.toHaveBeenCalled();
});

it("should sync live events from streaming service if streaming service is available", async () => {
input$.next({
checkpointServiceUrl: "",
streamServiceUrl: "http://localhost:50052",
chainId: 4242,
worldContract: { address: "0x00", abi: [] },
provider: { jsonRpcUrl: "", options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true } },
initialBlockNumber: 0,
});
await sleep(0);
expect(createLatestEventStreamRPC).not.toHaveBeenCalled();
expect(createLatestEventStreamService).toHaveBeenCalled();
});

it("should sync from the snapshot if the snapshot block number is more than 100 blocks newer than then cache", async () => {
input$.next({
checkpointServiceUrl: "http://localhost:50052",
checkpointServiceUrl: "http://localhost:50062",
streamServiceUrl: "",
chainId: 4242,
worldContract: { address: "0x00", abi: [] },
provider: { jsonRpcUrl: "", options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true } },
Expand All @@ -205,6 +238,7 @@ describe("Sync.worker", () => {
it("should sync from the cache if the snapshot service is not available", async () => {
input$.next({
checkpointServiceUrl: "",
streamServiceUrl: "",
chainId: 4242,
worldContract: { address: "0x00", abi: [] },
provider: { jsonRpcUrl: "", options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true } },
Expand All @@ -228,6 +262,7 @@ describe("Sync.worker", () => {
it("should fetch the state diff between cache/snapshot and current block number", async () => {
input$.next({
checkpointServiceUrl: "",
streamServiceUrl: "",
chainId: 4242,
worldContract: { address: "0x00", abi: [] },
provider: { jsonRpcUrl: "", options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true } },
Expand Down Expand Up @@ -259,6 +294,7 @@ describe("Sync.worker", () => {
it("should first load from cache, then fetch the state gap, then pass live events", async () => {
input$.next({
checkpointServiceUrl: "",
streamServiceUrl: "",
chainId: 4242,
worldContract: { address: "0x00", abi: [] },
provider: { jsonRpcUrl: "", options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true } },
Expand Down
16 changes: 12 additions & 4 deletions packages/network/src/workers/SyncWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import {
createSnapshotClient,
createDecode,
createFetchWorldEventsInBlockRange,
createLatestEventStream,
getSnapshotBlockNumber,
fetchSnapshot,
fetchStateInBlockRange,
fetchSnapshotChunked,
createLatestEventStreamRPC,
createLatestEventStreamService,
createTransformWorldEventsFromStream,
} from "./syncUtils";
import { createBlockNumberStream } from "../createBlockNumberStream";
import { GodID, SyncState } from "./constants";
Expand Down Expand Up @@ -76,6 +78,7 @@ export class SyncWorker<Cm extends Components> implements DoWork<SyncWorkerConfi
const computedConfig = await streamToDefinedComputed(this.input$);
const {
checkpointServiceUrl,
streamServiceUrl,
chainId,
worldContract,
provider: { options: providerOptions },
Expand All @@ -94,6 +97,7 @@ export class SyncWorker<Cm extends Components> implements DoWork<SyncWorkerConfi
providerOptions?.batch,
decode
);
const transformWorldEvents = createTransformWorldEventsFromStream(decode);

// Start syncing current events, but only start streaming to output once gap between initial state and current block is closed

Expand All @@ -102,7 +106,11 @@ export class SyncWorker<Cm extends Components> implements DoWork<SyncWorkerConfi
let passLiveEventsToOutput = false;
const cacheStore = { current: createCacheStore() };
const { blockNumber$ } = createBlockNumberStream(providers);
createLatestEventStream(blockNumber$, fetchWorldEvents).subscribe((event) => {
const latestEvent$ = streamServiceUrl
? createLatestEventStreamService(streamServiceUrl, worldContract.address, transformWorldEvents)
: createLatestEventStreamRPC(blockNumber$, fetchWorldEvents);

latestEvent$.subscribe((event) => {
storeEvent(cacheStore.current, event);
if (passLiveEventsToOutput) this.output$.next(event as Output<Cm>);
});
Expand All @@ -123,7 +131,7 @@ export class SyncWorker<Cm extends Components> implements DoWork<SyncWorkerConfi

if (syncFromSnapshot) {
this.setLoadingState(SyncState.INITIAL, "Fetching initial state from snapshot", 50);
initialState = await fetchSnapshot(snapshotClient, worldContract.address, decode);
initialState = await fetchSnapshotChunked(snapshotClient, worldContract.address, decode);
} else {
this.setLoadingState(SyncState.INITIAL, "Fetching initial state from cache", 50);
initialState = await loadIndexDbCacheStore(indexDbCache);
Expand Down
13 changes: 9 additions & 4 deletions packages/network/src/workers/syncUtils.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ import { sleep } from "@latticexyz/utils";
import { Subject } from "rxjs";
import { NetworkComponentUpdate } from "../types";
import { getCacheStoreEntries } from "./CacheStore";
import { createLatestEventStream, createSnapshotClient, createWorldTopics, fetchStateInBlockRange } from "./syncUtils";
import {
createLatestEventStreamRPC,
createSnapshotClient,
createWorldTopics,
fetchStateInBlockRange,
} from "./syncUtils";

describe("syncUtils", () => {
describe("createSnapshotClient", () => {
it("should not error", () => {
const snapshotClient = createSnapshotClient("http://localhost:50052");
const snapshotClient = createSnapshotClient("http://localhost:50062");
expect(snapshotClient).toBeDefined();
});
});
Expand All @@ -21,7 +26,7 @@ describe("syncUtils", () => {
it.todo("end-to-end test");
});

describe("createLatestEventStream", () => {
describe("createLatestEventStreamRPC", () => {
it("should fetch world events for new block range when a new block number arrives", async () => {
const blockNumber$ = new Subject<number>();
const event: NetworkComponentUpdate = {
Expand All @@ -36,7 +41,7 @@ describe("syncUtils", () => {
const fetchWorldEvents = jest.fn(() => Promise.resolve([event, event]));
const latestEvent = jest.fn();

createLatestEventStream(blockNumber$, fetchWorldEvents).subscribe(latestEvent);
createLatestEventStreamRPC(blockNumber$, fetchWorldEvents).subscribe(latestEvent);

expect(fetchWorldEvents).not.toHaveBeenCalled();

Expand Down
Loading

0 comments on commit 8c9d4b3

Please sign in to comment.