Skip to content

Commit

Permalink
feat(network): add system call stream (#162)
Browse files Browse the repository at this point in the history
* feat: system call stream

* fix(network): add type argument to createSyncWorker

* fix(network): properly cache blocks when fetching system call information

* fix(std-client): only pass NetworkComponentUpdateEvent to applyNetworkUpdates

* fix(network): ignore system calls during initial sync

* fix(network): early return before storing to cache during initial sync

Co-authored-by: alvarius <89248902+alvrs@users.noreply.github.com>
Co-authored-by: alvrs <alvarius@lattice.xyz>
  • Loading branch information
3 people authored Sep 26, 2022
1 parent d720277 commit 5caef57
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 40 deletions.
9 changes: 4 additions & 5 deletions packages/network/src/createSyncWorker.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { Components } from "@latticexyz/recs";
import { fromWorker } from "@latticexyz/utils";
import { Subject } from "rxjs";
import { NetworkComponentUpdate, SyncWorkerConfig } from "./types";
import { Output } from "./workers/SyncWorker";
import { NetworkEvent, SyncWorkerConfig } from "./types";

/**
* Create a new SyncWorker ({@link Sync.worker.ts}) to performn contract/client state sync.
Expand All @@ -14,13 +13,13 @@ import { Output } from "./workers/SyncWorker";
* dispose: function to dispose of the sync worker
* }
*/
export function createSyncWorker<Cm extends Components>() {
export function createSyncWorker<C extends Components = Components>() {
const config$ = new Subject<SyncWorkerConfig>();
const worker = new Worker(new URL("./workers/Sync.worker.ts", import.meta.url), { type: "module" });
const ecsEvent$ = new Subject<NetworkComponentUpdate<Cm>>();
const ecsEvent$ = new Subject<NetworkEvent<C>>();

// Pass in a "config stream", receive a stream of ECS events
const subscription = fromWorker<SyncWorkerConfig, Output<Cm>>(worker, config$).subscribe(ecsEvent$);
const subscription = fromWorker<SyncWorkerConfig, NetworkEvent<C>>(worker, config$).subscribe(ecsEvent$);
const dispose = () => {
worker.terminate();
subscription?.unsubscribe();
Expand Down
34 changes: 33 additions & 1 deletion packages/network/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Result } from "@ethersproject/abi";
import { Components, ComponentValue, EntityID, SchemaOf } from "@latticexyz/recs";
import { Cached } from "@latticexyz/utils";
import { BaseContract, ContractInterface } from "ethers";
import { BaseContract, BigNumber, ContractInterface } from "ethers";
import { Observable } from "rxjs";

export interface NetworkConfig {
Expand Down Expand Up @@ -69,6 +69,7 @@ export type Mappings<C extends Components> = {

export type NetworkComponentUpdate<C extends Components = Components> = {
[key in keyof C]: {
type: NetworkEvents.NetworkComponentUpdate;
component: key & string;
value: ComponentValue<SchemaOf<C[key]>> | undefined;
};
Expand All @@ -79,6 +80,36 @@ export type NetworkComponentUpdate<C extends Components = Components> = {
blockNumber: number;
};

export type SystemCallTransaction = {
hash: string;
to: string;
data: string;
value: BigNumber;
};

export type SystemCall<C extends Components = Components> = {
type: NetworkEvents.SystemCall;
tx: SystemCallTransaction;
updates: NetworkComponentUpdate<C>[];
};

export enum NetworkEvents {
SystemCall = "SystemCall",
NetworkComponentUpdate = "NetworkComponentUpdate",
}

export type NetworkEvent<C extends Components = Components> = NetworkComponentUpdate<C> | SystemCall<C>;

export function isSystemCallEvent<C extends Components>(e: NetworkEvent<C>): e is SystemCall<C> {
return e.type === NetworkEvents.SystemCall;
}

export function isNetworkComponentUpdateEvent<C extends Components>(
e: NetworkEvent<C>
): e is NetworkComponentUpdate<C> {
return e.type === NetworkEvents.NetworkComponentUpdate;
}

export type SyncWorkerConfig = {
provider: ProviderConfig;
initialBlockNumber: number;
Expand All @@ -87,6 +118,7 @@ export type SyncWorkerConfig = {
chainId: number;
checkpointServiceUrl?: string;
streamServiceUrl?: string;
fetchSystemCalls?: boolean;
};

export enum ContractSchemaValue {
Expand Down
26 changes: 25 additions & 1 deletion packages/network/src/workers/CacheStore.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { EntityID } from "@latticexyz/recs";
import { packTuple } from "@latticexyz/utils";
import { NetworkComponentUpdate } from "../types";
import { NetworkComponentUpdate, NetworkEvents } from "../types";
import {
createCacheStore,
getCacheStoreEntries,
Expand Down Expand Up @@ -29,6 +29,7 @@ describe("CacheStore", () => {
describe("storeEvent", () => {
it("should store events to the cacheStore", () => {
const event: NetworkComponentUpdate = {
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x00" as EntityID,
component: "Position",
value: { x: 1, y: 2 },
Expand All @@ -49,6 +50,7 @@ describe("CacheStore", () => {
expect([...cacheStore.state.entries()]).toEqual([[packTuple([0, 0]), { x: 1, y: 2 }]]);

const event2: NetworkComponentUpdate = {
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x01" as EntityID,
component: "Position",
value: { x: 1, y: 2 },
Expand All @@ -73,6 +75,7 @@ describe("CacheStore", () => {

it("should normalize hex entity ids to the same padding", () => {
const event1: NetworkComponentUpdate = {
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x00000000000000000000000001" as EntityID,
component: "Position",
value: { x: 1, y: 2 },
Expand All @@ -82,6 +85,7 @@ describe("CacheStore", () => {
};

const event2: NetworkComponentUpdate = {
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x00001" as EntityID,
component: "Position",
value: { x: 1, y: 3 },
Expand All @@ -97,6 +101,7 @@ describe("CacheStore", () => {
const events = [...getCacheStoreEntries(cacheStore)];
expect(events.length).toBe(1);
expect(events[0]).toEqual({
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x01" as EntityID,
component: "Position",
value: { x: 1, y: 3 },
Expand All @@ -108,6 +113,7 @@ describe("CacheStore", () => {

it("should set block number to one less than the last received event", () => {
const event: NetworkComponentUpdate = {
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x00" as EntityID,
component: "Position",
value: { x: 1, y: 2 },
Expand All @@ -130,6 +136,7 @@ describe("CacheStore", () => {
const cacheStore = createCacheStore();

const event: NetworkComponentUpdate = {
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x00" as EntityID,
component: "Position",
value: { x: 1, y: 2 },
Expand All @@ -142,6 +149,7 @@ describe("CacheStore", () => {

expect([...getCacheStoreEntries(cacheStore)]).toEqual([
{
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x00",
component: "Position",
value: { x: 1, y: 2 },
Expand All @@ -152,6 +160,7 @@ describe("CacheStore", () => {
]);

const event2: NetworkComponentUpdate = {
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x00" as EntityID,
component: "Position",
value: { x: 2, y: 2 },
Expand All @@ -164,6 +173,7 @@ describe("CacheStore", () => {

expect([...getCacheStoreEntries(cacheStore)]).toEqual([
{
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x00",
component: "Position",
value: { x: 2, y: 2 },
Expand All @@ -174,6 +184,7 @@ describe("CacheStore", () => {
]);

const event3: NetworkComponentUpdate = {
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x01" as EntityID,
component: "Position",
value: { x: -1, y: 2 },
Expand All @@ -186,6 +197,7 @@ describe("CacheStore", () => {

expect([...getCacheStoreEntries(cacheStore)]).toEqual([
{
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x00",
component: "Position",
value: { x: 2, y: 2 },
Expand All @@ -194,6 +206,7 @@ describe("CacheStore", () => {
txHash: "cache",
},
{
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x01",
component: "Position",
value: { x: -1, y: 2 },
Expand All @@ -211,6 +224,7 @@ describe("CacheStore", () => {
const cacheStore2 = createCacheStore();

const event1: NetworkComponentUpdate = {
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x00" as EntityID,
component: "Position",
value: { x: 1, y: 2 },
Expand All @@ -220,6 +234,7 @@ describe("CacheStore", () => {
};

const event2: NetworkComponentUpdate = {
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x01" as EntityID,
component: "Health",
value: { value: 1 },
Expand All @@ -232,6 +247,7 @@ describe("CacheStore", () => {
storeEvent(cacheStore1, event2);

const event3: NetworkComponentUpdate = {
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x00" as EntityID,
component: "Position",
value: { x: 3, y: 2 },
Expand All @@ -241,6 +257,7 @@ describe("CacheStore", () => {
};

const event4: NetworkComponentUpdate = {
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x00" as EntityID,
component: "Speed",
value: { value: 10 },
Expand All @@ -258,6 +275,7 @@ describe("CacheStore", () => {

expect(entries).toEqual([
{
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x00",
component: "Position",
value: { x: 3, y: 2 },
Expand All @@ -266,6 +284,7 @@ describe("CacheStore", () => {
txHash: "cache",
},
{
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x01",
component: "Health",
value: { value: 1 },
Expand All @@ -274,6 +293,7 @@ describe("CacheStore", () => {
txHash: "cache",
},
{
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x00",
component: "Speed",
value: { value: 10 },
Expand All @@ -292,27 +312,31 @@ describe("CacheStore", () => {
const cacheStore = createCacheStore();

storeEvent(cacheStore, {
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x00" as EntityID,
component: "Position",
value: { x: 1, y: 2 },
blockNumber: 1,
});

storeEvent(cacheStore, {
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x01" as EntityID,
component: "Health",
value: { value: 1 },
blockNumber: 2,
});

storeEvent(cacheStore, {
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x00" as EntityID,
component: "Position",
value: { x: 3, y: 2 },
blockNumber: 3,
});

storeEvent(cacheStore, {
type: NetworkEvents.NetworkComponentUpdate,
entity: "0x00" as EntityID,
component: "Speed",
value: { value: 10 },
Expand Down
3 changes: 2 additions & 1 deletion packages/network/src/workers/CacheStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Components, ComponentValue, EntityID, SchemaOf } from "@latticexyz/recs
import { packTuple, transformIterator, unpackTuple } from "@latticexyz/utils";
import { initCache } from "../initCache";
import { ECSStateReply } from "@latticexyz/services/protobuf/ts/ecs-snapshot/ecs-snapshot";
import { NetworkComponentUpdate } from "../types";
import { NetworkComponentUpdate, NetworkEvents } from "../types";
import { BigNumber } from "ethers";

export type State = Map<number, ComponentValue>;
Expand Down Expand Up @@ -83,6 +83,7 @@ export function getCacheStoreEntries<Cm extends Components>({
}

const ecsEvent: NetworkComponentUpdate<Cm> = {
type: NetworkEvents.NetworkComponentUpdate,
component,
entity: entity as EntityID,
value: value as ComponentValue<SchemaOf<Cm[keyof Cm]>>,
Expand Down
Loading

0 comments on commit 5caef57

Please sign in to comment.