Skip to content

Commit

Permalink
feat(doc-storage): impl indexedDB
Browse files Browse the repository at this point in the history
  • Loading branch information
forehalo committed Sep 30, 2024
1 parent 390d25a commit d10a2fe
Show file tree
Hide file tree
Showing 29 changed files with 2,060 additions and 620 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ serde = "1"
serde_json = "1"
sha3 = "0.10"
sqlx = { version = "0.8", default-features = false, features = ["chrono", "macros", "migrate", "runtime-tokio", "sqlite", "tls-rustls"] }
log = "0.4"
tiktoken-rs = "0.5"
tokio = "1.37"
uuid = "1.8"
Expand Down
29 changes: 29 additions & 0 deletions packages/backend/server/src/core/doc/storage/doc.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import {
applyUpdate,
diffUpdate,
Doc,
encodeStateAsUpdate,
encodeStateVector,
encodeStateVectorFromUpdate,
mergeUpdates,
UndoManager,
} from 'yjs';
Expand All @@ -19,6 +21,12 @@ export interface DocRecord {
editor?: string;
}

export interface DocDiff {
missing: Uint8Array;
state: Uint8Array;
timestamp: number;
}

export interface DocUpdate {
bin: Uint8Array;
timestamp: number;
Expand Down Expand Up @@ -96,6 +104,27 @@ export abstract class DocStorageAdapter extends Connection {
return snapshot;
}

async getDocDiff(
spaceId: string,
docId: string,
stateVector?: Uint8Array
): Promise<DocDiff | null> {
const doc = await this.getDoc(spaceId, docId);

if (!doc) {
return null;
}

const missing = stateVector ? diffUpdate(doc.bin, stateVector) : doc.bin;
const state = encodeStateVectorFromUpdate(doc.bin);

return {
missing,
state,
timestamp: doc.timestamp,
};
}

abstract pushDocUpdates(
spaceId: string,
docId: string,
Expand Down
31 changes: 12 additions & 19 deletions packages/backend/server/src/core/sync/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {
WebSocketGateway,
} from '@nestjs/websockets';
import { Socket } from 'socket.io';
import { diffUpdate, encodeStateVectorFromUpdate } from 'yjs';

import {
AlreadyInSpace,
Expand Down Expand Up @@ -233,31 +232,25 @@ export class SpaceSyncGateway
@MessageBody()
{ spaceType, spaceId, docId, stateVector }: LoadDocMessage
): Promise<
EventResponse<{ missing: string; state?: string; timestamp: number }>
EventResponse<{ missing: string; state: string; timestamp: number }>
> {
const adapter = this.selectAdapter(client, spaceType);
adapter.assertIn(spaceId);

const doc = await adapter.get(spaceId, docId);
const doc = await adapter.diff(
spaceId,
docId,
stateVector ? Buffer.from(stateVector, 'base64') : undefined
);

if (!doc) {
throw new DocNotFound({ spaceId, docId });
}

const missing = Buffer.from(
stateVector
? diffUpdate(doc.bin, Buffer.from(stateVector, 'base64'))
: doc.bin
).toString('base64');

const state = Buffer.from(encodeStateVectorFromUpdate(doc.bin)).toString(
'base64'
);

return {
data: {
missing,
state,
missing: Buffer.from(doc.missing).toString('base64'),
state: Buffer.from(doc.state).toString('base64'),
timestamp: doc.timestamp,
},
};
Expand Down Expand Up @@ -600,9 +593,9 @@ abstract class SyncSocketAdapter {
return this.storage.pushDocUpdates(spaceId, docId, updates, editorId);
}

get(spaceId: string, docId: string) {
diff(spaceId: string, docId: string, stateVector?: Uint8Array) {
this.assertIn(spaceId);
return this.storage.getDoc(spaceId, docId);
return this.storage.getDocDiff(spaceId, docId, stateVector);
}

getTimestamps(spaceId: string, timestamp?: number) {
Expand Down Expand Up @@ -630,9 +623,9 @@ class WorkspaceSyncAdapter extends SyncSocketAdapter {
return super.push(spaceId, id.guid, updates, editorId);
}

override get(spaceId: string, docId: string) {
override diff(spaceId: string, docId: string, stateVector?: Uint8Array) {
const id = new DocID(docId, spaceId);
return this.storage.getDoc(spaceId, id.guid);
return this.storage.getDocDiff(spaceId, id.guid, stateVector);
}

async assertAccessible(
Expand Down
6 changes: 5 additions & 1 deletion packages/common/doc-storage/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
"private": true,
"sideEffects": false,
"exports": {
".": "./src/index.ts"
"./impls/*": "./impls/*",
"./storage": "./storage/index.ts"
},
"dependencies": {
"@affine/native": "workspace:*",
"idb": "^8.0.0",
"lodash-es": "^4.17.21",
"socket.io-client": "^4.7.5",
"yjs": "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch"
},
"devDependencies": {
Expand Down
232 changes: 232 additions & 0 deletions packages/common/doc-storage/src/impls/affine-cloud.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
import type { Socket } from 'socket.io-client';

import { DocStorageAdapter } from '../storage';

// TODO(@forehalo): use [UserFriendlyError]
interface EventError {
name: string;
message: string;
}

type WebsocketResponse<T> =
| {
error: EventError;
}
| {
data: T;
};

interface ServerEvents {
'space:broadcast-doc-updates': {
spaceType: string;
spaceId: string;
docId: string;
updates: string[];
timestamp: number;
};
}

interface ClientEvents {
'space:join': [
{ spaceType: string; spaceId: string; clientVersion: string },
{ clientId: string },
];
'space:leave': { spaceType: string; spaceId: string };
'space:push-doc-updates': [
{ spaceType: string; spaceId: string; docId: string; updates: string[] },
{ timestamp: number },
];
'space:load-doc-timestamps': [
{
spaceType: string;
spaceId: string;
timestamp?: number;
},
Record<string, number>,
];
'space:load-doc': [
{
spaceType: string;
spaceId: string;
docId: string;
stateVector?: string;
},
{
missing: string;
state: string;
timestamp: number;
},
];
}

type ServerEventsMap = {
[Key in keyof ServerEvents]: (data: ServerEvents[Key]) => void;
};
type ClientEventsMap = {
[Key in keyof ClientEvents]: ClientEvents[Key] extends Array<any>
? (
data: ClientEvents[Key][0],
ack: (res: WebsocketResponse<ClientEvents[Key][1]>) => void
) => void
: (data: ClientEvents[Key]) => void;
};

export class AFFiNECloudDocStorageAdapter extends DocStorageAdapter {
constructor(
private readonly spaceType: string,
protected override spaceId: string,
private readonly socket: Socket<ServerEventsMap, ClientEventsMap>
) {
super();
}

override async connect(): Promise<void> {
// the event will be polled, there is no need to wait for socket to be connected
await this.clientHandShake();
this.socket.on('space:broadcast-doc-updates', this.onServerUpdates);
}

private async clientHandShake() {
const res = await this.socket.emitWithAck('space:join', {
spaceType: this.spaceType,
spaceId: this.spaceId,
clientVersion: BUILD_CONFIG.appVersion,
});

if ('error' in res) {
// TODO(@forehalo): use [UserFriendlyError]
throw new Error(res.error.message);
}
}

override async disconnect(): Promise<void> {
this.socket.emit('space:leave', {
spaceType: this.spaceType,
spaceId: this.spaceId,
});
this.socket.off('space:broadcast-doc-updates', this.onServerUpdates);
}

onServerUpdates: ServerEventsMap['space:broadcast-doc-updates'] = message => {
if (
this.spaceType === message.spaceType &&
this.spaceId === message.spaceId
) {
this.dispatchDocUpdatesListeners(
message.docId,
message.updates.map(base64ToUint8Array),
message.timestamp
);
}
};

override async getDocDiff(docId: string, stateVector?: Uint8Array) {
const response = await this.socket.emitWithAck('space:load-doc', {
spaceType: this.spaceType,
spaceId: this.spaceId,
docId,
stateVector: stateVector ? await uint8ArrayToBase64(stateVector) : void 0,
});

if ('error' in response) {
// TODO: use [UserFriendlyError]
throw new Error(response.error.message);
}

return {
missing: base64ToUint8Array(response.data.missing),
state: base64ToUint8Array(response.data.state),
timestamp: response.data.timestamp,
};
}

async pushDocUpdates(docId: string, updates: Uint8Array[]): Promise<number> {
const response = await this.socket.emitWithAck('space:push-doc-updates', {
spaceType: this.spaceType,
spaceId: this.spaceId,
docId,
updates: await Promise.all(updates.map(uint8ArrayToBase64)),
});

if ('error' in response) {
// TODO(@forehalo): use [UserFriendlyError]
throw new Error(response.error.message);
}

return response.data.timestamp;
}

async getSpaceDocTimestamps(
after?: number
): Promise<Record<string, number> | null> {
const response = await this.socket.emitWithAck(
'space:load-doc-timestamps',
{
spaceType: this.spaceType,
spaceId: this.spaceId,
timestamp: after,
}
);

if ('error' in response) {
// TODO(@forehalo): use [UserFriendlyError]
throw new Error(response.error.message);
}

return response.data;
}

// TODO(@forehalo): is there a good way to avoid this boilerplate?
async deleteDoc(): Promise<void> {}
async deleteSpace(): Promise<void> {}
async listDocHistories() {
return [];
}
async getDocHistory() {
return null;
}
protected async getDocSnapshot() {
return null;
}
protected async setDocSnapshot() {
return false;
}
protected async getDocUpdates() {
return [];
}
protected async markUpdatesMerged() {
return 0;
}
protected async createDocHistory() {
return false;
}
}

export function uint8ArrayToBase64(array: Uint8Array): Promise<string> {
return new Promise<string>(resolve => {
// Create a blob from the Uint8Array
const blob = new Blob([array]);

const reader = new FileReader();
reader.onload = function () {
const dataUrl = reader.result as string | null;
if (!dataUrl) {
resolve('');
return;
}
// The result includes the `data:` URL prefix and the MIME type. We only want the Base64 data
const base64 = dataUrl.split(',')[1];
resolve(base64);
};

reader.readAsDataURL(blob);
});
}

export function base64ToUint8Array(base64: string) {
const binaryString = atob(base64);
const binaryArray = binaryString.split('').map(function (char) {
return char.charCodeAt(0);
});
return new Uint8Array(binaryArray);
}
Loading

0 comments on commit d10a2fe

Please sign in to comment.