Skip to content

Commit

Permalink
Improve duplicate connect (#282)
Browse files Browse the repository at this point in the history
* Improved duplicate connect handling

When a user calls connect multiple times, the behavior is now consistent
where it returns a promise so caller wait to see if connection is
successful or otherwise.

* changeset
  • Loading branch information
davidzhao authored Jun 25, 2022
1 parent ed7a612 commit 20584eb
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 35 deletions.
5 changes: 5 additions & 0 deletions .changeset/serious-timers-add.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'livekit-client': patch
---

Improved duplicate connect handling
108 changes: 73 additions & 35 deletions src/room/Room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import { Track } from './track/Track';
import { TrackPublication } from './track/TrackPublication';
import { AdaptiveStreamSettings, RemoteTrack } from './track/types';
import { getNewAudioContext } from './track/utils';
import { isWeb, unpackStreamId } from './utils';
import { Future, isWeb, unpackStreamId } from './utils';

export enum ConnectionState {
Disconnected = 'disconnected',
Expand Down Expand Up @@ -98,6 +98,8 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
/** used for aborting pending connections to a LiveKit server */
private abortController?: AbortController;

private connectFuture?: Future<void>;

/**
* Creates a new Room, the primary construct for a LiveKit session.
* @param options
Expand Down Expand Up @@ -191,13 +193,16 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
return DeviceManager.getInstance().getDevices(kind, requestPermissions);
}

connect = async (url: string, token: string, opts?: RoomConnectOptions) => {
// guard against calling connect
if (this.state !== ConnectionState.Disconnected) {
connect = async (url: string, token: string, opts?: RoomConnectOptions): Promise<void> => {
if (this.state === ConnectionState.Connected) {
// when the state is reconnecting or connected, this function returns immediately
log.warn(`already connected to room ${this.name}`);
return;
}

if (this.connectFuture) {
return this.connectFuture.promise;
}
this.setAndEmitConnectionState(ConnectionState.Connecting);

if (!this.abortController || this.abortController.signal.aborted) {
Expand Down Expand Up @@ -292,42 +297,51 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
this.emit(RoomEvent.SignalConnected);
} catch (err) {
this.recreateEngine();
this.setAndEmitConnectionState(ConnectionState.Disconnected);
this.setAndEmitConnectionState(
ConnectionState.Disconnected,
new ConnectionError('could not establish signal connection'),
);
throw err;
}

// don't return until ICE connected
return new Promise<Room>((resolve, reject) => {
const connectTimeout = setTimeout(() => {
// timeout
this.recreateEngine();
this.setAndEmitConnectionState(ConnectionState.Disconnected);
reject(new ConnectionError('could not connect after timeout'));
}, maxICEConnectTimeout);
const abortHandler = () => {
log.warn('closing engine');
clearTimeout(connectTimeout);
this.recreateEngine();
this.setAndEmitConnectionState(ConnectionState.Disconnected);
reject(new ConnectionError('room connection has been cancelled'));
};
if (this.abortController?.signal.aborted) {
abortHandler();
const connectTimeout = setTimeout(() => {
// timeout
this.recreateEngine();
this.setAndEmitConnectionState(
ConnectionState.Disconnected,
new ConnectionError('could not connect PeerConnection after timeout'),
);
}, maxICEConnectTimeout);
const abortHandler = () => {
log.warn('closing engine');
clearTimeout(connectTimeout);
this.recreateEngine();
this.setAndEmitConnectionState(
ConnectionState.Disconnected,
new ConnectionError('room connection has been cancelled'),
);
};
if (this.abortController?.signal.aborted) {
abortHandler();
}
this.abortController?.signal.addEventListener('abort', abortHandler);

this.engine.once(EngineEvent.Connected, () => {
clearTimeout(connectTimeout);
this.abortController?.signal.removeEventListener('abort', abortHandler);
// also hook unload event
if (isWeb()) {
window.addEventListener('beforeunload', this.onBeforeUnload);
navigator.mediaDevices?.addEventListener('devicechange', this.handleDeviceChange);
}
this.abortController?.signal.addEventListener('abort', abortHandler);

this.engine.once(EngineEvent.Connected, () => {
clearTimeout(connectTimeout);
this.abortController?.signal.removeEventListener('abort', abortHandler);
// also hook unload event
if (isWeb()) {
window.addEventListener('beforeunload', this.onBeforeUnload);
navigator.mediaDevices?.addEventListener('devicechange', this.handleDeviceChange);
}
this.setAndEmitConnectionState(ConnectionState.Connected);
resolve(this);
});
this.setAndEmitConnectionState(ConnectionState.Connected);
});

if (this.connectFuture) {
/** @ts-ignore */
return this.connectFuture.promise;
}
};

/**
Expand Down Expand Up @@ -996,11 +1010,35 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
}
}

private setAndEmitConnectionState(state: ConnectionState): boolean {
private setAndEmitConnectionState(state: ConnectionState, error?: Error): boolean {
if (state === this.state) {
// unchanged
return false;
}
switch (state) {
case ConnectionState.Connecting:
case ConnectionState.Reconnecting:
if (!this.connectFuture) {
// reuse existing connect future if possible
this.connectFuture = new Future<void>();
}
break;
case ConnectionState.Connected:
if (this.connectFuture) {
this.connectFuture.resolve();
this.connectFuture = undefined;
}
break;
case ConnectionState.Disconnected:
if (this.connectFuture) {
error ??= new Error('disconnected from Room');
this.connectFuture.reject(error);
this.connectFuture = undefined;
}
break;
default:
// nothing
}
this.state = state;
this.emit(RoomEvent.ConnectionStateChanged, this.state);
return true;
Expand Down
15 changes: 15 additions & 0 deletions src/room/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,18 @@ export function getEmptyAudioStreamTrack() {
}
return emptyAudioStreamTrack;
}

export class Future<T> {
promise: Promise<T>;

resolve!: (arg: T) => void;

reject!: (e: any) => void;

constructor() {
this.promise = new Promise<T>((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
}
}

0 comments on commit 20584eb

Please sign in to comment.