Skip to content
Merged
10 changes: 10 additions & 0 deletions ably.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,14 @@ declare namespace ChannelModes {
* The client can receive presence messages.
*/
type PRESENCE_SUBSCRIBE = 'PRESENCE_SUBSCRIBE';
/**
* The client can publish LiveObjects state messages.
*/
type STATE_PUBLISH = 'STATE_PUBLISH';
/**
* The client can receive LiveObjects state messages.
*/
type STATE_SUBSCRIBE = 'STATE_SUBSCRIBE';
/**
* The client is resuming an existing connection.
*/
Expand All @@ -885,6 +893,8 @@ export type ChannelMode =
| ChannelModes.SUBSCRIBE
| ChannelModes.PRESENCE
| ChannelModes.PRESENCE_SUBSCRIBE
| ChannelModes.STATE_PUBLISH
| ChannelModes.STATE_SUBSCRIBE
| ChannelModes.ATTACH_RESUME;

/**
Expand Down
4 changes: 3 additions & 1 deletion scripts/moduleReport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { gzip } from 'zlib';
import Table from 'cli-table';

// The maximum size we allow for a minimal useful Realtime bundle (i.e. one that can subscribe to a channel)
const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 99, gzip: 30 };
const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 100, gzip: 31 };

const baseClientNames = ['BaseRest', 'BaseRealtime'];

Expand Down Expand Up @@ -314,6 +314,8 @@ async function checkLiveObjectsPluginFiles() {
'src/plugins/liveobjects/liveobject.ts',
'src/plugins/liveobjects/liveobjects.ts',
'src/plugins/liveobjects/liveobjectspool.ts',
'src/plugins/liveobjects/statemessage.ts',
'src/plugins/liveobjects/syncliveobjectsdatapool.ts',
]);

return checkBundleFiles(pluginBundleInfo, allowedFiles, 100);
Expand Down
3 changes: 3 additions & 0 deletions src/common/lib/client/baserealtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import { ModularPlugins, RealtimePresencePlugin } from './modularplugins';
import { TransportNames } from 'common/constants/TransportName';
import { TransportImplementations } from 'common/platform';
import Defaults from '../util/defaults';
import type * as LiveObjectsPlugin from 'plugins/liveobjects';

/**
`BaseRealtime` is an export of the tree-shakable version of the SDK, and acts as the base class for the `DefaultRealtime` class exported by the non tree-shakable version.
*/
class BaseRealtime extends BaseClient {
readonly _RealtimePresence: RealtimePresencePlugin | null;
readonly _LiveObjectsPlugin: typeof LiveObjectsPlugin | null;
// Extra transport implementations available to this client, in addition to those in Platform.Transports.bundledImplementations
readonly _additionalTransportImplementations: TransportImplementations;
_channels: any;
Expand Down Expand Up @@ -58,6 +60,7 @@ class BaseRealtime extends BaseClient {

this._additionalTransportImplementations = BaseRealtime.transportImplementationsFromPlugins(this.options.plugins);
this._RealtimePresence = this.options.plugins?.RealtimePresence ?? null;
this._LiveObjectsPlugin = this.options.plugins?.LiveObjects ?? null;
this.connection = new Connection(this, this.options);
this._channels = new Channels(this);
if (this.options.autoConnect !== false) this.connect();
Expand Down
49 changes: 47 additions & 2 deletions src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import Message, {
fromValuesArray as messagesFromValuesArray,
encodeArray as encodeMessagesArray,
decode as decodeMessage,
decodeData,
getMessagesSize,
CipherOptions,
EncodingDecodingContext,
Expand Down Expand Up @@ -533,12 +534,18 @@ class RealtimeChannel extends EventEmitter {
const resumed = message.hasFlag('RESUMED');
const hasPresence = message.hasFlag('HAS_PRESENCE');
const hasBacklog = message.hasFlag('HAS_BACKLOG');
const hasState = message.hasFlag('HAS_STATE');
if (this.state === 'attached') {
if (!resumed) {
/* On a loss of continuity, the presence set needs to be re-synced */
// we have lost continuity.
// the presence set needs to be re-synced
if (this._presence) {
this._presence.onAttached(hasPresence);
}
// the Live Objects state needs to be re-synced
if (this._liveObjects) {
this._liveObjects.onAttached(hasState);
}
}
const change = new ChannelStateChange(this.state, this.state, resumed, hasBacklog, message.error);
this._allChannelChanges.emit('update', change);
Expand All @@ -549,7 +556,7 @@ class RealtimeChannel extends EventEmitter {
/* RTL5i: re-send DETACH and remain in the 'detaching' state */
this.checkPendingState();
} else {
this.notifyState('attached', message.error, resumed, hasPresence, hasBacklog);
this.notifyState('attached', message.error, resumed, hasPresence, hasBacklog, hasState);
}
break;
}
Expand Down Expand Up @@ -613,6 +620,40 @@ class RealtimeChannel extends EventEmitter {
}
break;
}

case actions.STATE_SYNC: {
if (!this._liveObjects) {
return;
}

const { id, connectionId, timestamp } = message;
const options = this.channelOptions;

const stateMessages = message.state ?? [];
for (let i = 0; i < stateMessages.length; i++) {
try {
const stateMessage = stateMessages[i];

await this.client._LiveObjectsPlugin?.StateMessage.decode(stateMessage, options, decodeData);

if (!stateMessage.connectionId) stateMessage.connectionId = connectionId;
if (!stateMessage.timestamp) stateMessage.timestamp = timestamp;
if (!stateMessage.id) stateMessage.id = id + ':' + i;
} catch (e) {
Logger.logAction(
this.logger,
Logger.LOG_ERROR,
'RealtimeChannel.processMessage()',
(e as Error).toString(),
);
}
}

this._liveObjects.handleStateSyncMessage(stateMessages, message.channelSerial);

break;
}

case actions.MESSAGE: {
//RTL17
if (this.state !== 'attached') {
Expand Down Expand Up @@ -743,6 +784,7 @@ class RealtimeChannel extends EventEmitter {
resumed?: boolean,
hasPresence?: boolean,
hasBacklog?: boolean,
hasState?: boolean,
): void {
Logger.logAction(
this.logger,
Expand All @@ -763,6 +805,9 @@ class RealtimeChannel extends EventEmitter {
if (this._presence) {
this._presence.actOnChannelState(state, hasPresence, reason);
}
if (this._liveObjects) {
this._liveObjects.actOnChannelState(state, hasState);
}
if (state === 'suspended' && this.connectionManager.state.sendEvents) {
this.startRetryTimer();
} else {
Expand Down
6 changes: 5 additions & 1 deletion src/common/lib/transport/comettransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,11 @@ abstract class CometTransport extends Transport {
if (items && items.length)
for (let i = 0; i < items.length; i++)
this.onProtocolMessage(
protocolMessageFromDeserialized(items[i], this.connectionManager.realtime._RealtimePresence),
protocolMessageFromDeserialized(
items[i],
this.connectionManager.realtime._RealtimePresence,
this.connectionManager.realtime._LiveObjectsPlugin,
),
);
} catch (e) {
Logger.logAction(
Expand Down
3 changes: 2 additions & 1 deletion src/common/lib/transport/connectionmanager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1805,7 +1805,8 @@ class ConnectionManager extends EventEmitter {

Logger.LOG_MICRO,
'ConnectionManager.send()',
'queueing msg; ' + stringifyProtocolMessage(msg, this.realtime._RealtimePresence),
'queueing msg; ' +
stringifyProtocolMessage(msg, this.realtime._RealtimePresence, this.realtime._LiveObjectsPlugin),
);
}
this.queue(msg, callback);
Expand Down
6 changes: 5 additions & 1 deletion src/common/lib/transport/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ class Protocol extends EventEmitter {
Logger.LOG_MICRO,
'Protocol.send()',
'sending msg; ' +
stringifyProtocolMessage(pendingMessage.message, this.transport.connectionManager.realtime._RealtimePresence),
stringifyProtocolMessage(
pendingMessage.message,
this.transport.connectionManager.realtime._RealtimePresence,
this.transport.connectionManager.realtime._LiveObjectsPlugin,
),
);
}
pendingMessage.sendAttempted = true;
Expand Down
6 changes: 5 additions & 1 deletion src/common/lib/transport/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,11 @@ abstract class Transport extends EventEmitter {
'received on ' +
this.shortName +
': ' +
stringifyProtocolMessage(message, this.connectionManager.realtime._RealtimePresence) +
stringifyProtocolMessage(
message,
this.connectionManager.realtime._RealtimePresence,
this.connectionManager.realtime._LiveObjectsPlugin,
) +
'; connectionId = ' +
this.connectionManager.connectionId,
);
Expand Down
1 change: 1 addition & 0 deletions src/common/lib/transport/websockettransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class WebSocketTransport extends Transport {
data,
this.connectionManager.realtime._MsgPack,
this.connectionManager.realtime._RealtimePresence,
this.connectionManager.realtime._LiveObjectsPlugin,
this.format,
),
);
Expand Down
69 changes: 51 additions & 18 deletions src/common/lib/types/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,36 @@ export async function decode(
message: Message | PresenceMessage,
inputContext: CipherOptions | EncodingDecodingContext | ChannelOptions,
): Promise<void> {
const { data, encoding, error } = await decodeData(message.data, message.encoding, inputContext);
message.data = data;
message.encoding = encoding;

if (error) {
throw error;
}
}

export async function decodeData(
data: any,
encoding: string | null | undefined,
inputContext: CipherOptions | EncodingDecodingContext | ChannelOptions,
): Promise<{
error?: ErrorInfo;
data: any;
encoding: string | null | undefined;
}> {
const context = normaliseContext(inputContext);
let lastPayload = data;
let decodedData = data;
let finalEncoding: string | null | undefined = encoding;
let decodingError: ErrorInfo | undefined = undefined;

let lastPayload = message.data;
const encoding = message.encoding;
if (encoding) {
const xforms = encoding.split('/');
let lastProcessedEncodingIndex,
encodingsToProcess = xforms.length,
data = message.data;

let lastProcessedEncodingIndex;
let encodingsToProcess = xforms.length;
let xform = '';

try {
while ((lastProcessedEncodingIndex = encodingsToProcess) > 0) {
// eslint-disable-next-line security/detect-unsafe-regex
Expand All @@ -173,16 +192,16 @@ export async function decode(
xform = match[1];
switch (xform) {
case 'base64':
data = Platform.BufferUtils.base64Decode(String(data));
decodedData = Platform.BufferUtils.base64Decode(String(decodedData));
if (lastProcessedEncodingIndex == xforms.length) {
lastPayload = data;
lastPayload = decodedData;
}
continue;
case 'utf-8':
data = Platform.BufferUtils.utf8Decode(data);
decodedData = Platform.BufferUtils.utf8Decode(decodedData);
continue;
case 'json':
data = JSON.parse(data);
decodedData = JSON.parse(decodedData);
continue;
case 'cipher':
if (
Expand All @@ -196,7 +215,7 @@ export async function decode(
if (xformAlgorithm != cipher.algorithm) {
throw new Error('Unable to decrypt message with given cipher; incompatible cipher params');
}
data = await cipher.decrypt(data);
decodedData = await cipher.decrypt(decodedData);
continue;
} else {
throw new Error('Unable to decrypt message; not an encrypted channel');
Expand All @@ -220,10 +239,12 @@ export async function decode(

// vcdiff expects Uint8Arrays, can't copy with ArrayBuffers.
const deltaBaseBuffer = Platform.BufferUtils.toBuffer(deltaBase as Buffer);
data = Platform.BufferUtils.toBuffer(data);
decodedData = Platform.BufferUtils.toBuffer(decodedData);

data = Platform.BufferUtils.arrayBufferViewToBuffer(context.plugins.vcdiff.decode(data, deltaBaseBuffer));
lastPayload = data;
decodedData = Platform.BufferUtils.arrayBufferViewToBuffer(
context.plugins.vcdiff.decode(decodedData, deltaBaseBuffer),
);
lastPayload = decodedData;
} catch (e) {
throw new ErrorInfo('Vcdiff delta decode failed with ' + e, 40018, 400);
}
Expand All @@ -234,18 +255,30 @@ export async function decode(
}
} catch (e) {
const err = e as ErrorInfo;
throw new ErrorInfo(
'Error processing the ' + xform + ' encoding, decoder returned ‘' + err.message + '’',
decodingError = new ErrorInfo(
`Error processing the ${xform} encoding, decoder returned ‘${err.message}’`,
err.code || 40013,
400,
);
} finally {
message.encoding =
finalEncoding =
(lastProcessedEncodingIndex as number) <= 0 ? null : xforms.slice(0, lastProcessedEncodingIndex).join('/');
message.data = data;
}
}

if (decodingError) {
return {
error: decodingError,
data: decodedData,
encoding: finalEncoding,
};
}

context.baseEncodedPreviousPayload = lastPayload;
return {
data: decodedData,
encoding: finalEncoding,
};
}

export async function fromResponseBody(
Expand Down
Loading