-
-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
19 changed files
with
869 additions
and
101 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,232 @@ | ||
import type { Socket } from 'socket.io-client'; | ||
|
||
import { DocStorageAdapter } from '../storage'; | ||
|
||
// TODO(@forehalo): use [UserFriendlyError] | ||
interface EventError { | ||
name: string; | ||
message: string; | ||
} | ||
|
||
type WebsocketResponse<T> = | ||
| { | ||
error: EventError; | ||
} | ||
| { | ||
data: T; | ||
}; | ||
|
||
interface ServerEvents { | ||
'space:broadcast-doc-updates': { | ||
spaceType: string; | ||
spaceId: string; | ||
docId: string; | ||
updates: string[]; | ||
timestamp: number; | ||
}; | ||
} | ||
|
||
interface ClientEvents { | ||
'space:join': [ | ||
{ spaceType: string; spaceId: string; clientVersion: string }, | ||
{ clientId: string }, | ||
]; | ||
'space:leave': { spaceType: string; spaceId: string }; | ||
'space:push-doc-updates': [ | ||
{ spaceType: string; spaceId: string; docId: string; updates: string[] }, | ||
{ timestamp: number }, | ||
]; | ||
'space:load-doc-timestamps': [ | ||
{ | ||
spaceType: string; | ||
spaceId: string; | ||
timestamp?: number; | ||
}, | ||
Record<string, number>, | ||
]; | ||
'space:load-doc': [ | ||
{ | ||
spaceType: string; | ||
spaceId: string; | ||
docId: string; | ||
stateVector?: string; | ||
}, | ||
{ | ||
missing: string; | ||
state: string; | ||
timestamp: number; | ||
}, | ||
]; | ||
} | ||
|
||
type ServerEventsMap = { | ||
[Key in keyof ServerEvents]: (data: ServerEvents[Key]) => void; | ||
}; | ||
type ClientEventsMap = { | ||
[Key in keyof ClientEvents]: ClientEvents[Key] extends Array<any> | ||
? ( | ||
data: ClientEvents[Key][0], | ||
ack: (res: WebsocketResponse<ClientEvents[Key][1]>) => void | ||
) => void | ||
: (data: ClientEvents[Key]) => void; | ||
}; | ||
|
||
export class AFFiNECloudDocStorageAdapter extends DocStorageAdapter { | ||
constructor( | ||
private readonly spaceType: string, | ||
protected override spaceId: string, | ||
private readonly socket: Socket<ServerEventsMap, ClientEventsMap> | ||
) { | ||
super(); | ||
} | ||
|
||
async connect(): Promise<void> { | ||
// the event will be polled, there is no need to wait for socket to be connected | ||
await this.clientHandShake(); | ||
this.socket.on('space:broadcast-doc-updates', this.onServerUpdates); | ||
} | ||
|
||
private async clientHandShake() { | ||
const res = await this.socket.emitWithAck('space:join', { | ||
spaceType: this.spaceType, | ||
spaceId: this.spaceId, | ||
clientVersion: runtimeConfig.appVersion, | ||
}); | ||
|
||
if ('error' in res) { | ||
// TODO(@forehalo): use [UserFriendlyError] | ||
throw new Error(res.error.message); | ||
} | ||
} | ||
|
||
async disconnect(): Promise<void> { | ||
this.socket.emit('space:leave', { | ||
spaceType: this.spaceType, | ||
spaceId: this.spaceId, | ||
}); | ||
this.socket.off('space:broadcast-doc-updates', this.onServerUpdates); | ||
} | ||
|
||
onServerUpdates: ServerEventsMap['space:broadcast-doc-updates'] = message => { | ||
if ( | ||
this.spaceType === message.spaceType && | ||
this.spaceId === message.spaceId | ||
) { | ||
this.dispatchDocUpdatesListeners( | ||
message.docId, | ||
message.updates.map(base64ToUint8Array), | ||
message.timestamp | ||
); | ||
} | ||
}; | ||
|
||
override async getDocDiff(docId: string, stateVector?: Uint8Array) { | ||
const response = await this.socket.emitWithAck('space:load-doc', { | ||
spaceType: this.spaceType, | ||
spaceId: this.spaceId, | ||
docId, | ||
stateVector: stateVector ? await uint8ArrayToBase64(stateVector) : void 0, | ||
}); | ||
|
||
if ('error' in response) { | ||
// TODO: use [UserFriendlyError] | ||
throw new Error(response.error.message); | ||
} | ||
|
||
return { | ||
missing: base64ToUint8Array(response.data.missing), | ||
state: base64ToUint8Array(response.data.state), | ||
timestamp: response.data.timestamp, | ||
}; | ||
} | ||
|
||
async pushDocUpdates(docId: string, updates: Uint8Array[]): Promise<number> { | ||
const response = await this.socket.emitWithAck('space:push-doc-updates', { | ||
spaceType: this.spaceType, | ||
spaceId: this.spaceId, | ||
docId, | ||
updates: await Promise.all(updates.map(uint8ArrayToBase64)), | ||
}); | ||
|
||
if ('error' in response) { | ||
// TODO(@forehalo): use [UserFriendlyError] | ||
throw new Error(response.error.message); | ||
} | ||
|
||
return response.data.timestamp; | ||
} | ||
|
||
async getSpaceDocTimestamps( | ||
after?: number | ||
): Promise<Record<string, number> | null> { | ||
const response = await this.socket.emitWithAck( | ||
'space:load-doc-timestamps', | ||
{ | ||
spaceType: this.spaceType, | ||
spaceId: this.spaceId, | ||
timestamp: after, | ||
} | ||
); | ||
|
||
if ('error' in response) { | ||
// TODO(@forehalo): use [UserFriendlyError] | ||
throw new Error(response.error.message); | ||
} | ||
|
||
return response.data; | ||
} | ||
|
||
// TODO(@forehalo): is there a good way to avoid this boilerplate? | ||
async deleteDoc(): Promise<void> {} | ||
async deleteSpace(): Promise<void> {} | ||
async listDocHistories() { | ||
return []; | ||
} | ||
async getDocHistory() { | ||
return null; | ||
} | ||
protected async getDocSnapshot() { | ||
return null; | ||
} | ||
protected async setDocSnapshot() { | ||
return false; | ||
} | ||
protected async getDocUpdates() { | ||
return []; | ||
} | ||
protected async markUpdatesMerged() { | ||
return 0; | ||
} | ||
protected async createDocHistory() { | ||
return false; | ||
} | ||
} | ||
|
||
export function uint8ArrayToBase64(array: Uint8Array): Promise<string> { | ||
return new Promise<string>(resolve => { | ||
// Create a blob from the Uint8Array | ||
const blob = new Blob([array]); | ||
|
||
const reader = new FileReader(); | ||
reader.onload = function () { | ||
const dataUrl = reader.result as string | null; | ||
if (!dataUrl) { | ||
resolve(''); | ||
return; | ||
} | ||
// The result includes the `data:` URL prefix and the MIME type. We only want the Base64 data | ||
const base64 = dataUrl.split(',')[1]; | ||
resolve(base64); | ||
}; | ||
|
||
reader.readAsDataURL(blob); | ||
}); | ||
} | ||
|
||
export function base64ToUint8Array(base64: string) { | ||
const binaryString = atob(base64); | ||
const binaryArray = binaryString.split('').map(function (char) { | ||
return char.charCodeAt(0); | ||
}); | ||
return new Uint8Array(binaryArray); | ||
} |
Oops, something went wrong.