Skip to content

Commit

Permalink
refactor(doc-storage): operation pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
forehalo committed Oct 31, 2024
1 parent 3f2a537 commit 80264b8
Show file tree
Hide file tree
Showing 58 changed files with 2,226 additions and 994 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 58 additions & 0 deletions packages/backend/server/src/core/sync/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,23 @@ interface LeaveSpaceAwarenessMessage {
docId: string;
}

/**
* @deprecated
*/
interface PushDocUpdatesMessage {
spaceType: SpaceType;
spaceId: string;
docId: string;
updates: string[];
}

interface PushDocUpdateMessage {
spaceType: SpaceType;
spaceId: string;
docId: string;
update: string;
}

interface LoadDocMessage {
spaceType: SpaceType;
spaceId: string;
Expand Down Expand Up @@ -237,6 +247,9 @@ export class SpaceSyncGateway
};
}

/**
* @deprecated use [space:push-doc-update] instead, client should always merge updates on their own
*/
@SubscribeMessage('space:push-doc-updates')
async onReceiveDocUpdates(
@ConnectedSocket() client: Socket,
Expand Down Expand Up @@ -281,6 +294,51 @@ export class SpaceSyncGateway
};
}

@SubscribeMessage('space:push-doc-update')
async onReceiveDocUpdate(
@ConnectedSocket() client: Socket,
@CurrentUser() user: CurrentUser,
@MessageBody()
message: PushDocUpdateMessage
): Promise<EventResponse<{ accepted: true; timestamp?: number }>> {
const { spaceType, spaceId, docId, update } = message;
const adapter = this.selectAdapter(client, spaceType);

// TODO(@forehalo): we might need to check write permission before push updates
const timestamp = await adapter.push(
spaceId,
docId,
[Buffer.from(update, 'base64')],
user.id
);

// TODO(@forehalo): separate different version of clients into different rooms,
// so the clients won't receive useless updates events
client.to(adapter.room(spaceId)).emit('space:broadcast-doc-updates', {
spaceType,
spaceId,
docId,
updates: [update],
timestamp,
});

client.to(adapter.room(spaceId)).emit('space:broadcast-doc-update', {
spaceType,
spaceId,
docId,
update,
timestamp,
editor: user.id,
});

return {
data: {
accepted: true,
timestamp,
},
};
}

@SubscribeMessage('space:load-doc-timestamps')
async onLoadDocTimestamps(
@ConnectedSocket() client: Socket,
Expand Down
97 changes: 0 additions & 97 deletions packages/common/doc-storage/README.md

This file was deleted.

22 changes: 14 additions & 8 deletions packages/common/doc-storage/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,25 @@
"private": true,
"sideEffects": false,
"exports": {
".": "./index.ts",
"./impls/*": "./impls/*",
"./storage": "./storage/index.ts"
".": "./index.ts"
},
"dependencies": {
"@affine/graphql": "workspace:*",
"@affine/native": "workspace:*",
"idb": "^8.0.0",
"eventemitter2": "^6.4.9",
"lodash-es": "^4.17.21",
"socket.io-client": "^4.7.5",
"yjs": "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch"
},
"devDependencies": {
"@types/lodash-es": "^4.17.12"
"@affine/electron-api": "workspace:*",
"@affine/graphql": "workspace:*",
"@affine/native": "workspace:*",
"idb": "^8.0.0",
"socket.io-client": "^4.7.5"
},
"peerDependencies": {
"@affine/electron-api": "workspace:*",
"@affine/graphql": "workspace:*",
"@affine/native": "workspace:*",
"idb": "^8.0.0",
"socket.io-client": "^4.7.5"
}
}
102 changes: 102 additions & 0 deletions packages/common/doc-storage/src/client/backend.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import EventEmitter2 from 'eventemitter2';

import {
ConnectOp,
DestroyOp,
DisconnectOp,
OpConsumer,
type OpSubscribableHandler,
SubscribeConnectionStatusOp,
} from '../op';
import type { Storage, StorageType } from '../storage';

export class PeerStorageBackend extends OpConsumer {
private readonly storages: Map<StorageType, Storage> = new Map();
private readonly replacedStorages: Set<Storage> = new Set();
private readonly event = new EventEmitter2();
private readonly storageListeners: WeakMap<Storage, () => void> = new Map();

constructor(port: MessagePort) {
super(port);
this.register(ConnectOp, this.connect);
this.register(DisconnectOp, this.disconnect);
this.register(DestroyOp, this.destroy);
this.registerSubscribable(
SubscribeConnectionStatusOp,
this.onStatusChanged
);
this.listen();
}

addBackendStorage<T extends new (...args: any) => Storage>(
Impl: T,
...opts: ConstructorParameters<T>
) {
const storage = new Impl(...opts);
const registered = this.storages.get(storage.storageType);
if (registered) {
this.replacedStorages.add(registered);
this.storageListeners.get(registered)?.();
}
this.storageListeners.set(
storage,
storage.connection.onStatusChanged((status, error) => {
this.event.emit('statusChanged', {
storageType: storage.storageType,
status,
error,
});
})
);
this.storages.set(storage.storageType, storage);
}

connect = async () => {
await Promise.allSettled(
Array.from(this.storages.values()).map(async storage => {
await storage.connect();
})
);

await Promise.allSettled(
Array.from(this.replacedStorages).map(async storage => {
await storage.disconnect();
this.replacedStorages.delete(storage);
})
);
};

disconnect = async () => {
await Promise.allSettled(
Object.values(this.storages).map(async storage => {
await storage.disconnect();
})
);
await Promise.allSettled(
Array.from(this.replacedStorages).map(async storage => {
await storage.disconnect();
this.replacedStorages.delete(storage);
})
);
};

override destroy = async () => {
this.close();
super.destroy();
await this.disconnect();
this.storages.clear();
this.replacedStorages.clear();
this.event.removeAllListeners();
};

onStatusChanged: OpSubscribableHandler<SubscribeConnectionStatusOp> = (
_,
callback
) => {
this.event.on('statusChanged', callback);

return () => {
this.event.off('statusChanged', callback);
};
};
}
49 changes: 49 additions & 0 deletions packages/common/doc-storage/src/client/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import {
ConnectOp,
DisconnectOp,
OpProducer,
SubscribeConnectionStatusOp,
} from '../op';
import { PeerStorageBackend } from './backend';

class PeerStorageClient extends OpProducer {
constructor(
port: MessagePort,
protected readonly backend: PeerStorageBackend
) {
super(port);
}

addStorage = this.backend.addBackendStorage.bind(this.backend);

async connect() {
this.listen();
await this.send(new ConnectOp());
}

async disconnect() {
this.close();
await this.send(new DisconnectOp());
}

async destroy() {
await this.backend.destroy();
this.close;
}

onConnectionStatusChanged() {
this.subscribe(new SubscribeConnectionStatusOp(), (/* storage */) => {});
}
}

export function createPeerStorageClient() {
const channel = new MessageChannel();
const producerPort = channel.port1;
const consumerPort = channel.port2;

const backend = new PeerStorageBackend(consumerPort);

const client = new PeerStorageClient(producerPort, backend);

return client;
}
Loading

0 comments on commit 80264b8

Please sign in to comment.