Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stuck reconnect #475

Merged
merged 19 commits into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/clean-rings-hug.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'livekit-client': patch
---

Fix reconnection attempts potentially getting stuck
36 changes: 20 additions & 16 deletions src/api/SignalClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ export class SignalClient {

return new Promise<JoinResponse | void>((resolve, reject) => {
const abortHandler = () => {
ws.close();
this.close();
reject(new ConnectionError('room connection has been cancelled'));
};
Expand All @@ -202,12 +201,14 @@ export class SignalClient {
}
abortSignal?.addEventListener('abort', abortHandler);
log.debug(`connecting to ${url + params}`);
this.ws = undefined;
const ws = new WebSocket(url + params);
ws.binaryType = 'arraybuffer';
if (this.ws) {
this.close();
}
this.ws = new WebSocket(url + params);
this.ws.binaryType = 'arraybuffer';

ws.onerror = async (ev: Event) => {
if (!this.ws) {
this.ws.onerror = async (ev: Event) => {
if (!this.isConnected) {
try {
const resp = await fetch(`http${url.substring(2)}/validate${params}`);
if (!resp.ok) {
Expand Down Expand Up @@ -236,8 +237,7 @@ export class SignalClient {
this.handleWSError(ev);
};

ws.onopen = () => {
this.ws = ws;
this.ws.onopen = () => {
if (opts.reconnect) {
// upon reconnection, there will not be additional handshake
this.isConnected = true;
Expand All @@ -247,7 +247,7 @@ export class SignalClient {
}
};

ws.onmessage = async (ev: MessageEvent) => {
this.ws.onmessage = async (ev: MessageEvent) => {
// not considered connected until JoinResponse is received
let resp: SignalResponse;
if (typeof ev.data === 'string') {
Expand Down Expand Up @@ -288,23 +288,27 @@ export class SignalClient {
this.handleSignalResponse(resp);
};

ws.onclose = (ev: CloseEvent) => {
if (!this.isConnected || this.ws !== ws) return;
this.ws.onclose = (ev: CloseEvent) => {
if (!this.isConnected) return;

log.debug(`websocket connection closed: ${ev.reason}`);
this.isConnected = false;
if (this.onClose) this.onClose(ev.reason);
if (this.ws === ws) {
this.ws = undefined;
if (this.onClose) {
this.onClose(ev.reason);
}
this.ws = undefined;
};
});
}

close() {
this.isConnected = false;
if (this.ws) this.ws.onclose = null;
this.ws?.close();
if (this.ws) {
this.ws.onclose = null;
this.ws.onmessage = null;
this.ws.onopen = null;
this.ws.close();
}
this.ws = undefined;
this.clearPingInterval();
}
Expand Down
149 changes: 94 additions & 55 deletions src/room/RTCEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
this.client = new SignalClient();
this.client.signalLatency = this.options.expSignalLatency;
this.reconnectPolicy = this.options.reconnectPolicy;
this.registerOnLineListener();
}

async join(
Expand Down Expand Up @@ -180,8 +181,9 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit

close() {
this._isClosed = true;

this.removeAllListeners();
this.deregisterOnLineListener();
this.clearPendingReconnect();
if (this.publisher && this.publisher.pc.signalingState !== 'closed') {
this.publisher.pc.getSenders().forEach((sender) => {
try {
Expand Down Expand Up @@ -699,66 +701,74 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
}
this.reconnectTimeout = setTimeout(async () => {
if (this._isClosed) {
return;
this.reconnectTimeout = setTimeout(() => this.attemptReconnect(signalEvents), delay);
};

private async attemptReconnect(signalEvents: boolean = false) {
if (this._isClosed) {
return;
}
// guard for attempting reconnection multiple times while one attempt is still not finished
if (this.attemptingReconnect) {
return;
}
if (
this.clientConfiguration?.resumeConnection === ClientConfigSetting.DISABLED ||
// signaling state could change to closed due to hardware sleep
// those connections cannot be resumed
(this.primaryPC?.signalingState ?? 'closed') === 'closed'
) {
this.fullReconnectOnNext = true;
}

try {
this.attemptingReconnect = true;
if (this.fullReconnectOnNext) {
await this.restartConnection(signalEvents);
} else {
await this.resumeConnection(signalEvents);
}
// guard for attempting reconnection multiple times while one attempt is still not finished
if (this.attemptingReconnect) {
return;
this.reconnectAttempts = 0;
this.fullReconnectOnNext = false;
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
}
if (
this.clientConfiguration?.resumeConnection === ClientConfigSetting.DISABLED ||
// signaling state could change to closed due to hardware sleep
// those connections cannot be resumed
(this.primaryPC?.signalingState ?? 'closed') === 'closed'
) {
this.fullReconnectOnNext = true;
} catch (e) {
this.reconnectAttempts += 1;
let reconnectRequired = false;
let recoverable = true;
let requireSignalEvents = false;
if (e instanceof UnexpectedConnectionState) {
log.debug('received unrecoverable error', { error: e });
// unrecoverable
recoverable = false;
} else if (!(e instanceof SignalReconnectError)) {
// cannot resume
reconnectRequired = true;
}

try {
this.attemptingReconnect = true;
if (this.fullReconnectOnNext) {
await this.restartConnection(signalEvents);
} else {
await this.resumeConnection(signalEvents);
}
this.reconnectAttempts = 0;
this.fullReconnectOnNext = false;
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
}
} catch (e) {
this.reconnectAttempts += 1;
let reconnectRequired = false;
let recoverable = true;
let requireSignalEvents = false;
if (e instanceof UnexpectedConnectionState) {
log.debug('received unrecoverable error', { error: e });
// unrecoverable
recoverable = false;
} else if (!(e instanceof SignalReconnectError)) {
// cannot resume
reconnectRequired = true;
}

// when we flip from resume to reconnect
// we need to fire the right reconnecting events
if (reconnectRequired && !this.fullReconnectOnNext) {
this.fullReconnectOnNext = true;
requireSignalEvents = true;
}
// when we flip from resume to reconnect
// we need to fire the right reconnecting events
if (reconnectRequired && !this.fullReconnectOnNext) {
this.fullReconnectOnNext = true;
requireSignalEvents = true;
}

if (recoverable) {
this.handleDisconnect('reconnect', requireSignalEvents);
} else {
disconnect(Date.now() - this.reconnectStart);
}
} finally {
this.attemptingReconnect = false;
if (recoverable) {
this.handleDisconnect('reconnect', requireSignalEvents);
} else {
log.info(
`could not recover connection after ${this.reconnectAttempts} attempts, ${
Date.now() - this.reconnectStart
}ms. giving up`,
);
this.emit(EngineEvent.Disconnected);
this.close();
}
}, delay);
};
} finally {
this.attemptingReconnect = false;
}
}

private getNextRetryDelay(context: ReconnectContext) {
try {
Expand Down Expand Up @@ -971,6 +981,35 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
}
}
}

private clearPendingReconnect() {
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
}
this.reconnectAttempts = 0;
}

private handleBrowserOnLine = () => {
// in case the engine is currently reconnecting, attempt a reconnect immediately after the browser state has changed to 'onLine'
if (this.client.isReconnecting) {
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
}
this.attemptReconnect(true);
}
};

private registerOnLineListener() {
if (isWeb()) {
window.addEventListener('online', this.handleBrowserOnLine);
}
}

private deregisterOnLineListener() {
if (isWeb()) {
window.removeEventListener('online', this.handleBrowserOnLine);
}
}
}

async function getConnectedAddress(pc: RTCPeerConnection): Promise<string | undefined> {
Expand Down
Loading