Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/short-vans-hammer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'livekit-client': patch
---

Send publisher offer with join request to accelerate connection
13 changes: 8 additions & 5 deletions examples/demo/demo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -406,13 +406,16 @@ const appActions = {
reject(error);
}
});
await Promise.all([
room.connect(url, token, connectOptions),
publishPromise.catch(appendLog),
]);
let connElapsed = 0;
const connectFn = async () => {
const connStartTime = Date.now();
await room.connect(url, token, connectOptions);
connElapsed = Date.now() - connStartTime;
};
await Promise.all([connectFn(), publishPromise.catch(appendLog)]);
const elapsed = Date.now() - startTime;
appendLog(
`successfully connected to ${room.name} in ${Math.round(elapsed)}ms`,
`successfully connected to ${room.name} in ${Math.round(elapsed)}ms (connect cost: ${Math.round(connElapsed)}ms)`,
await room.engine.getConnectedServerAddress(),
);
} catch (error: any) {
Expand Down
13 changes: 9 additions & 4 deletions src/api/SignalClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,10 @@ describe('SignalClient.connect', () => {

describe('Failure Case - WebSocket Connection Errors', () => {
it('should reject with NotAllowed error for 4xx HTTP status', async () => {
const openedPromise = Promise.reject(new Error('Connection failed'));
openedPromise.catch(() => {}); // prevent unhandled rejection before join attaches its handler
mockWebSocketStream({
opened: Promise.reject(new Error('Connection failed')),
opened: openedPromise,
readyState: 3,
});

Expand All @@ -321,8 +323,10 @@ describe('SignalClient.connect', () => {
});

it('should reject with ServerUnreachable when fetch fails', async () => {
const openedPromise = Promise.reject(new Error('Connection failed'));
openedPromise.catch(() => {}); // prevent unhandled rejection before join attaches its handler
mockWebSocketStream({
opened: Promise.reject(new Error('Connection failed')),
opened: openedPromise,
readyState: 3,
});

Expand All @@ -338,9 +342,10 @@ describe('SignalClient.connect', () => {

it('should handle ConnectionError from WebSocket rejection', async () => {
const customError = ConnectionError.internal('Custom error', { status: 500 });

const openedPromise = Promise.reject(customError);
openedPromise.catch(() => {}); // prevent unhandled rejection before join attaches its handler
mockWebSocketStream({
opened: Promise.reject(customError),
opened: openedPromise,
readyState: 3,
});

Expand Down
53 changes: 46 additions & 7 deletions src/api/SignalClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ import {
UpdateVideoLayers,
VideoLayer,
WrappedJoinRequest,
WrappedJoinRequest_Compression,
protoInt64,
} from '@livekit/protocol';
import log, { LoggerNames, getLogger } from '../logger';
import { ConnectionError } from '../room/errors';
import CriticalTimers from '../room/timers';
import type { LoggerOptions } from '../room/types';
import { getClientInfo, isReactNative, sleep } from '../room/utils';
import { getClientInfo, isCompressionStreamSupported, isReactNative, sleep } from '../room/utils';
import { AsyncQueue } from '../utils/AsyncQueue';
import { type WebSocketConnection, WebSocketStream } from './WebSocketStream';
import {
Expand Down Expand Up @@ -248,12 +249,13 @@ export class SignalClient {
opts: SignalOptions,
abortSignal?: AbortSignal,
useV0Path: boolean = false,
publisherOffer?: SessionDescription,
): Promise<JoinResponse> {
// during a full reconnect, we'd want to start the sequence even if currently
// connected
this.state = SignalConnectionState.CONNECTING;
this.options = opts;
const res = await this.connect(url, token, opts, abortSignal, useV0Path);
const res = await this.connect(url, token, opts, abortSignal, useV0Path, publisherOffer);
return res as JoinResponse;
}

Expand Down Expand Up @@ -296,6 +298,7 @@ export class SignalClient {
abortSignal?: AbortSignal,
/** setting this to true results in dual peer connection mode being used */
useV0Path: boolean = false,
publisherOffer?: SessionDescription,
): Promise<JoinResponse | ReconnectResponse | undefined> {
const unlock = await this.connectionLock.lock();

Expand All @@ -305,7 +308,7 @@ export class SignalClient {
const clientInfo = getClientInfo();
const params = useV0Path
? createConnectionParams(token, clientInfo, opts)
: createJoinRequestConnectionParams(token, clientInfo, opts);
: await createJoinRequestConnectionParams(token, clientInfo, opts, publisherOffer);
const rtcUrl = createRtcUrl(url, params, useV0Path).toString();
const validateUrl = createValidateUrl(rtcUrl).toString();

Expand Down Expand Up @@ -1125,11 +1128,12 @@ function createConnectionParams(
return params;
}

function createJoinRequestConnectionParams(
async function createJoinRequestConnectionParams(
token: string,
info: ClientInfo,
opts: ConnectOpts,
): URLSearchParams {
publisherOffer?: SessionDescription,
): Promise<URLSearchParams> {
const params = new URLSearchParams();
params.set('access_token', token);

Expand All @@ -1141,14 +1145,49 @@ function createJoinRequestConnectionParams(
}),
reconnect: !!opts.reconnect,
participantSid: opts.sid ? opts.sid : undefined,
publisherOffer: publisherOffer,
});
if (opts.reconnectReason) {
joinRequest.reconnectReason = opts.reconnectReason;
}
const joinRequestBytes = joinRequest.toBinary();
let requestBytes: Uint8Array;
let compression: WrappedJoinRequest_Compression;
if (isCompressionStreamSupported()) {
const stream = new CompressionStream('gzip');
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need feature gating here to ensure this path is only used for browsers that support compression stream: https://developer.mozilla.org/en-US/docs/Web/API/Compression_Streams_API

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added 627d1ff

const writer = stream.writable.getWriter();
writer.write(new Uint8Array(joinRequestBytes));
writer.close();
const chunks: Uint8Array[] = [];
const reader = stream.readable.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
}
const totalLength = chunks.reduce((acc, chunk) => acc + chunk.length, 0);
const result = new Uint8Array(totalLength);
let offset = 0;
for (const chunk of chunks) {
result.set(chunk, offset);
offset += chunk.length;
}
requestBytes = result;
compression = WrappedJoinRequest_Compression.GZIP;
} else {
requestBytes = joinRequestBytes;
compression = WrappedJoinRequest_Compression.NONE;
}
const wrappedJoinRequest = new WrappedJoinRequest({
joinRequest: joinRequest.toBinary(),
joinRequest: requestBytes,
compression,
});
params.set('join_request', btoa(new TextDecoder('utf-8').decode(wrappedJoinRequest.toBinary())));
const wrappedBytes = wrappedJoinRequest.toBinary();
const bytesToBase64 = (bytes: Uint8Array) => {
const binString = Array.from(bytes, (byte) => String.fromCodePoint(byte)).join('');
return btoa(binString);
};
params.set('join_request', bytesToBase64(wrappedBytes).replace(/\+/g, '-').replace(/\//g, '_'));

return params;
}
42 changes: 41 additions & 1 deletion src/room/PCTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ export default class PCTransport extends EventEmitter {

private offerLock: Mutex;

private pendingInitialOffer?: RTCSessionDescriptionInit;

pendingCandidates: RTCIceCandidateInit[] = [];

restartingIce: boolean = false;
Expand Down Expand Up @@ -163,6 +165,16 @@ export default class PCTransport extends EventEmitter {
this.remoteStereoMids = stereoMids;
this.remoteNackMids = nackMids;
} else if (sd.type === 'answer') {
if (this.pendingInitialOffer) {
const initialOffer = this.pendingInitialOffer;
this.pendingInitialOffer = undefined;
const sdpParsed = parse(initialOffer.sdp ?? '');
sdpParsed.media.forEach((media) => {
ensureIPAddrMatchVersion(media);
});
this.log.debug('setting pending initial offer before processing answer', this.logContext);
await this.setMungedSDP(initialOffer, write(sdpParsed));
}
const sdpParsed = parse(sd.sdp ?? '');
sdpParsed.media.forEach((media) => {
const mid = getMidString(media.mid!);
Expand Down Expand Up @@ -255,6 +267,31 @@ export default class PCTransport extends EventEmitter {
}
}, debounceInterval);

async createInitialOffer() {
const unlock = await this.offerLock.lock();
try {
if (this.pc.signalingState !== 'stable') {
this.log.warn(
'signaling state is not stable, cannot create initial offer',
this.logContext,
);
return;
}
const offerId = this.latestOfferId + 1;
this.latestOfferId = offerId;
const offer = await this.pc.createOffer();
this.pendingInitialOffer = { sdp: offer.sdp, type: offer.type };
const sdpParsed = parse(offer.sdp ?? '');
sdpParsed.media.forEach((media) => {
ensureIPAddrMatchVersion(media);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done when handling answer too. Is it needed in both places?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make sure server receives same offer as client

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thank you. I thought pendingInitialOffer generated here will already have the changes in-place and when it is read in answer path, it should have the changes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The setMungedSDP in answer handle has a fallback path that use original sdp again if munged sdp is invalid(return error in setLocalDescription), ensureIPAddrMatchVersion shoud not make sdp invalid in theory but still keep the original sdp here to get a consistent behavior.

});
offer.sdp = write(sdpParsed);
return { offer, offerId };
} finally {
unlock();
}
}

async createAndSendOffer(options?: RTCOfferOptions) {
const unlock = await this.offerLock.lock();

Expand All @@ -268,7 +305,10 @@ export default class PCTransport extends EventEmitter {
this.restartingIce = true;
}

if (this._pc && this._pc.signalingState === 'have-local-offer') {
if (
this._pc &&
(this._pc.signalingState === 'have-local-offer' || this.pendingInitialOffer)
) {
// we're waiting for the peer to accept our offer, so we'll just wait
// the only exception to this is when ICE restart is needed
const currentSD = this._pc.remoteDescription;
Expand Down
2 changes: 1 addition & 1 deletion src/room/PCTransportManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export class PCTransportManager {
return this._mode;
}

constructor(rtcConfig: RTCConfiguration, mode: PCMode, loggerOptions: LoggerOptions) {
constructor(mode: PCMode, loggerOptions: LoggerOptions, rtcConfig?: RTCConfiguration) {
this.log = getLogger(loggerOptions.loggerName ?? LoggerNames.PCManager);
this.loggerOptions = loggerOptions;

Expand Down
Loading
Loading