Skip to content

Commit

Permalink
Fixed issues with reconnecting to the same Room object (#280)
Browse files Browse the repository at this point in the history
* Fixed issues with reconnecting to the same Room object

Previously the old RTCEngine would have removed all of its listeners
after a failure, causing Room to miss all of the engine events.

* changeset
  • Loading branch information
davidzhao authored Jun 24, 2022
1 parent c4bec15 commit b0a5f6a
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 17 deletions.
5 changes: 5 additions & 0 deletions .changeset/twelve-fans-unite.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'livekit-client': patch
---

Fixed reconnection with the same Room object
51 changes: 36 additions & 15 deletions src/room/Room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
})
.on(EngineEvent.Restarting, this.handleRestarting)
.on(EngineEvent.Restarted, this.handleRestarted);

if (this.localParticipant) {
this.localParticipant.engine = this.engine;
}
}

/**
Expand Down Expand Up @@ -287,7 +291,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
this.metadata = joinResponse.room!.metadata;
this.emit(RoomEvent.SignalConnected);
} catch (err) {
this.engine.close();
this.recreateEngine();
this.setAndEmitConnectionState(ConnectionState.Disconnected);
throw err;
}
Expand All @@ -296,14 +300,14 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
return new Promise<Room>((resolve, reject) => {
const connectTimeout = setTimeout(() => {
// timeout
this.engine.close();
this.recreateEngine();
this.setAndEmitConnectionState(ConnectionState.Disconnected);
reject(new ConnectionError('could not connect after timeout'));
}, maxICEConnectTimeout);
const abortHandler = () => {
log.warn('closing engine');
clearTimeout(connectTimeout);
this.engine.close();
this.recreateEngine();
this.setAndEmitConnectionState(ConnectionState.Disconnected);
reject(new ConnectionError('room connection has been cancelled'));
};
Expand Down Expand Up @@ -495,6 +499,18 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
}
}

private recreateEngine() {
this.engine.close();
/* @ts-ignore */
this.engine = undefined;

// clear out existing remote participants, since they may have attached
// the old engine
this.participants.clear();

this.createEngine();
}

private onTrackAdded(
mediaTrack: MediaStreamTrack,
stream: MediaStream,
Expand All @@ -508,7 +524,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
if (this.state === ConnectionState.Connecting || this.state === ConnectionState.Reconnecting) {
setTimeout(() => {
this.onTrackAdded(mediaTrack, stream, receiver);
}, 10);
}, 50);
return;
}
if (this.state === ConnectionState.Disconnected) {
Expand Down Expand Up @@ -538,14 +554,14 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
}

private handleRestarting = () => {
if (this.setAndEmitConnectionState(ConnectionState.Reconnecting)) {
this.emit(RoomEvent.Reconnecting);
}

// also unwind existing participants & existing subscriptions
for (const p of this.participants.values()) {
this.handleParticipantDisconnected(p.sid, p);
}

if (this.setAndEmitConnectionState(ConnectionState.Reconnecting)) {
this.emit(RoomEvent.Reconnecting);
}
};

private handleRestarted = async (joinResponse: JoinResponse) => {
Expand Down Expand Up @@ -650,7 +666,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
this.handleParticipantDisconnected(info.sid, remoteParticipant);
} else if (isNewParticipant) {
// fire connected event
this.emit(RoomEvent.ParticipantConnected, remoteParticipant);
this.emitWhenConnected(RoomEvent.ParticipantConnected, remoteParticipant);
} else {
// just update, no events
remoteParticipant.updateInfo(info);
Expand All @@ -669,7 +685,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
participant.tracks.forEach((publication) => {
participant.unpublishTrack(publication.trackSid, true);
});
this.emit(RoomEvent.ParticipantDisconnected, participant);
this.emitWhenConnected(RoomEvent.ParticipantDisconnected, participant);
}

// updates are sent only when there's a change to speaker ordering
Expand Down Expand Up @@ -704,7 +720,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
});

this.activeSpeakers = activeSpeakers;
this.emit(RoomEvent.ActiveSpeakersChanged, activeSpeakers);
this.emitWhenConnected(RoomEvent.ActiveSpeakersChanged, activeSpeakers);
};

// process list of changed speakers
Expand Down Expand Up @@ -733,7 +749,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
const activeSpeakers = Array.from(lastSpeakers.values());
activeSpeakers.sort((a, b) => b.audioLevel - a.audioLevel);
this.activeSpeakers = activeSpeakers;
this.emit(RoomEvent.ActiveSpeakersChanged, activeSpeakers);
this.emitWhenConnected(RoomEvent.ActiveSpeakersChanged, activeSpeakers);
};

private handleStreamStateUpdate = (streamStateUpdate: StreamStateUpdate) => {
Expand All @@ -748,7 +764,12 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
}
pub.track.streamState = Track.streamStateFromProto(streamState.state);
participant.emit(ParticipantEvent.TrackStreamStateChanged, pub, pub.track.streamState);
this.emit(ParticipantEvent.TrackStreamStateChanged, pub, pub.track.streamState, participant);
this.emitWhenConnected(
ParticipantEvent.TrackStreamStateChanged,
pub,
pub.track.streamState,
participant,
);
});
};

Expand All @@ -768,7 +789,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
pub,
pub.subscriptionStatus,
);
this.emit(
this.emitWhenConnected(
ParticipantEvent.TrackSubscriptionPermissionChanged,
pub,
pub.subscriptionStatus,
Expand Down Expand Up @@ -809,7 +830,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)

private handleRoomUpdate = (r: RoomModel) => {
this.metadata = r.metadata;
this.emit(RoomEvent.RoomMetadataChanged, r.metadata);
this.emitWhenConnected(RoomEvent.RoomMetadataChanged, r.metadata);
};

private handleConnectionQualityUpdate = (update: ConnectionQualityUpdate) => {
Expand Down
5 changes: 3 additions & 2 deletions src/room/participant/LocalParticipant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@ export default class LocalParticipant extends Participant {
/** map of track sid => all published tracks */
tracks: Map<string, LocalTrackPublication>;

/** @internal */
engine: RTCEngine;

private pendingPublishing = new Set<Track.Source>();

private cameraError: Error | undefined;

private microphoneError: Error | undefined;

private engine: RTCEngine;

private participantTrackPermissions: Array<ParticipantTrackPermission> = [];

private allParticipantsAllowedToSubscribe: boolean = true;
Expand Down

0 comments on commit b0a5f6a

Please sign in to comment.