Skip to content

Commit

Permalink
handle combined participant update. (livekit#130)
Browse files Browse the repository at this point in the history
* handle combined participant update (WIP).

* always emit events for new publications.

* update info for exist participant.

* fix tests.

* update.

* Move emitWhenConnected to Room layer.
  • Loading branch information
cloudwebrtc authored Jul 4, 2022
1 parent 8f877e9 commit bcfea1e
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 18 deletions.
21 changes: 21 additions & 0 deletions lib/src/core/engine.dart
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,27 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
logger.fine('[WebRTC] stream.onRemoveTrack');
};

if (connectionState == ConnectionState.reconnecting ||
connectionState == ConnectionState.connecting) {
final track = event.track;
final receiver = event.receiver;
events.on<EngineConnectionStateUpdatedEvent>((event) async {
Timer(const Duration(milliseconds: 10), () {
events.emit(EngineTrackAddedEvent(
track: track,
stream: stream,
receiver: receiver,
));
});
});
return;
}

if (connectionState == ConnectionState.disconnected) {
logger.warning('skipping incoming track after Room disconnected');
return;
}

events.emit(EngineTrackAddedEvent(
track: event.track,
stream: stream,
Expand Down
33 changes: 28 additions & 5 deletions lib/src/core/room.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'package:collection/collection.dart';
import 'package:meta/meta.dart';

import '../core/signal_client.dart';
import '../events.dart';
Expand Down Expand Up @@ -190,10 +191,16 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
}
//
await publication.updateSubscriptionAllowed(event.allowed);
emitWhenConnected(TrackSubscriptionPermissionChangedEvent(
participant: participant,
publication: publication,
state: publication.subscriptionState,
));
})
..on<SignalRoomUpdateEvent>((event) async {
_metadata = event.room.metadata;
events.emit(RoomMetadataChangedEvent(metadata: event.room.metadata));
emitWhenConnected(
RoomMetadataChangedEvent(metadata: event.room.metadata));
})
..on<SignalConnectionStateUpdatedEvent>((event) {
// during reconnection, need to send sync state upon signal connection.
Expand Down Expand Up @@ -279,6 +286,9 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
String sid, lk_models.ParticipantInfo? info) {
RemoteParticipant? participant = _participants[sid];
if (participant != null) {
if (info != null) {
participant.updateFromInfo(info);
}
return participant;
}

Expand Down Expand Up @@ -323,7 +333,8 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {

if (isNew) {
hasChanged = true;
events.emit(ParticipantConnectedEvent(participant: participant));
// fire connected event
emitWhenConnected(ParticipantConnectedEvent(participant: participant));
} else {
await participant.updateFromInfo(info);
}
Expand Down Expand Up @@ -357,7 +368,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
final activeSpeakers = lastSpeakers.values.toList();
activeSpeakers.sort((a, b) => b.audioLevel.compareTo(a.audioLevel));
_activeSpeakers = activeSpeakers;
events.emit(ActiveSpeakersChangedEvent(speakers: activeSpeakers));
emitWhenConnected(ActiveSpeakersChangedEvent(speakers: activeSpeakers));
}

// from data channel
Expand Down Expand Up @@ -391,7 +402,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
}

_activeSpeakers = activeSpeakers;
events.emit(ActiveSpeakersChangedEvent(speakers: activeSpeakers));
emitWhenConnected(ActiveSpeakersChangedEvent(speakers: activeSpeakers));
}

void _onSignalConnectionQualityUpdateEvent(
Expand Down Expand Up @@ -422,6 +433,11 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
if (trackPublication == null) continue;
// update the stream state
await trackPublication.updateStreamState(update.state.toLKType());
emitWhenConnected(TrackStreamStateUpdatedEvent(
participant: participant,
publication: trackPublication,
streamState: update.state.toLKType(),
));
}
}

Expand Down Expand Up @@ -452,7 +468,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {

await participant.unpublishAllTracks(notify: true);

events.emit(ParticipantDisconnectedEvent(participant: participant));
emitWhenConnected(ParticipantDisconnectedEvent(participant: participant));
}

Future<void> _sendSyncState() async {
Expand Down Expand Up @@ -512,6 +528,13 @@ extension RoomPrivateMethods on Room {
_serverVersion = null;
_serverRegion = null;
}

@internal
void emitWhenConnected(RoomEvent event) {
if (connectionState == ConnectionState.connected) {
events.emit(event);
}
}
}

extension RoomDebugMethods on Room {
Expand Down
15 changes: 7 additions & 8 deletions lib/src/participant/remote.dart
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ class RemoteParticipant extends Participant<RemoteTrackPublication> {
@internal
Future<void> updateFromInfo(lk_models.ParticipantInfo info) async {
logger.fine('RemoteParticipant.updateFromInfo(info: $info)');
final hadInfo = hasInfo;
super.updateFromInfo(info);

// figuring out deltas between tracks
Expand Down Expand Up @@ -168,13 +167,13 @@ class RemoteParticipant extends Participant<RemoteTrackPublication> {
}
}

// notify listeners when it's not a new participant
if (hadInfo) {
for (final pub in newPubs) {
final event = TrackPublishedEvent(
participant: this,
publication: pub,
);
// always emit events for new publications, Room will not forward them unless it's ready
for (final pub in newPubs) {
final event = TrackPublishedEvent(
participant: this,
publication: pub,
);
if (room.connectionState == ConnectionState.connected) {
[events, room.events].emit(event);
}
}
Expand Down
2 changes: 0 additions & 2 deletions lib/src/publication/remote.dart
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ class RemoteTrackPublication<T extends RemoteTrack>
_streamState = streamState;
[
participant.events,
participant.room.events,
].emit(TrackStreamStateUpdatedEvent(
participant: participant,
publication: this,
Expand Down Expand Up @@ -304,7 +303,6 @@ class RemoteTrackPublication<T extends RemoteTrack>
// emit events
[
participant.events,
participant.room.events,
].emit(TrackSubscriptionPermissionChangedEvent(
participant: participant,
publication: this,
Expand Down
12 changes: 9 additions & 3 deletions test/core/room_e2e_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,16 @@ void main() {
test('participant join', () async {
expect(
room.events.streamCtrl.stream,
emits(predicate<ParticipantConnectedEvent>(
(event) => event.participant.sid == remoteParticipantData.sid,
)),
emitsInOrder(<Matcher>[
predicate<TrackPublishedEvent>(
(event) => event.participant.sid == remoteParticipantData.sid,
),
predicate<ParticipantConnectedEvent>(
(event) => event.participant.sid == remoteParticipantData.sid,
)
]),
);

ws.onData(participantJoinResponse.writeToBuffer());

await room.events.waitFor<ParticipantConnectedEvent>(
Expand Down

0 comments on commit bcfea1e

Please sign in to comment.