From bcfea1e625f3cc268535394f3b83553a5efab227 Mon Sep 17 00:00:00 2001 From: CloudWebRTC Date: Mon, 4 Jul 2022 11:11:56 +0800 Subject: [PATCH] handle combined participant update. (#130) * handle combined participant update (WIP). * always emit events for new publications. * update info for exist participant. * fix tests. * update. * Move emitWhenConnected to Room layer. --- lib/src/core/engine.dart | 21 +++++++++++++++++++++ lib/src/core/room.dart | 33 ++++++++++++++++++++++++++++----- lib/src/participant/remote.dart | 15 +++++++-------- lib/src/publication/remote.dart | 2 -- test/core/room_e2e_test.dart | 12 +++++++++--- 5 files changed, 65 insertions(+), 18 deletions(-) diff --git a/lib/src/core/engine.dart b/lib/src/core/engine.dart index 3bf087b02..7d6f58602 100644 --- a/lib/src/core/engine.dart +++ b/lib/src/core/engine.dart @@ -406,6 +406,27 @@ class Engine extends Disposable with EventsEmittable { logger.fine('[WebRTC] stream.onRemoveTrack'); }; + if (connectionState == ConnectionState.reconnecting || + connectionState == ConnectionState.connecting) { + final track = event.track; + final receiver = event.receiver; + events.on((event) async { + Timer(const Duration(milliseconds: 10), () { + events.emit(EngineTrackAddedEvent( + track: track, + stream: stream, + receiver: receiver, + )); + }); + }); + return; + } + + if (connectionState == ConnectionState.disconnected) { + logger.warning('skipping incoming track after Room disconnected'); + return; + } + events.emit(EngineTrackAddedEvent( track: event.track, stream: stream, diff --git a/lib/src/core/room.dart b/lib/src/core/room.dart index 9ca41e21d..b914d26d8 100644 --- a/lib/src/core/room.dart +++ b/lib/src/core/room.dart @@ -1,4 +1,5 @@ import 'package:collection/collection.dart'; +import 'package:meta/meta.dart'; import '../core/signal_client.dart'; import '../events.dart'; @@ -190,10 +191,16 @@ class Room extends DisposableChangeNotifier with EventsEmittable { } // await publication.updateSubscriptionAllowed(event.allowed); + emitWhenConnected(TrackSubscriptionPermissionChangedEvent( + participant: participant, + publication: publication, + state: publication.subscriptionState, + )); }) ..on((event) async { _metadata = event.room.metadata; - events.emit(RoomMetadataChangedEvent(metadata: event.room.metadata)); + emitWhenConnected( + RoomMetadataChangedEvent(metadata: event.room.metadata)); }) ..on((event) { // during reconnection, need to send sync state upon signal connection. @@ -279,6 +286,9 @@ class Room extends DisposableChangeNotifier with EventsEmittable { String sid, lk_models.ParticipantInfo? info) { RemoteParticipant? participant = _participants[sid]; if (participant != null) { + if (info != null) { + participant.updateFromInfo(info); + } return participant; } @@ -323,7 +333,8 @@ class Room extends DisposableChangeNotifier with EventsEmittable { if (isNew) { hasChanged = true; - events.emit(ParticipantConnectedEvent(participant: participant)); + // fire connected event + emitWhenConnected(ParticipantConnectedEvent(participant: participant)); } else { await participant.updateFromInfo(info); } @@ -357,7 +368,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { final activeSpeakers = lastSpeakers.values.toList(); activeSpeakers.sort((a, b) => b.audioLevel.compareTo(a.audioLevel)); _activeSpeakers = activeSpeakers; - events.emit(ActiveSpeakersChangedEvent(speakers: activeSpeakers)); + emitWhenConnected(ActiveSpeakersChangedEvent(speakers: activeSpeakers)); } // from data channel @@ -391,7 +402,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { } _activeSpeakers = activeSpeakers; - events.emit(ActiveSpeakersChangedEvent(speakers: activeSpeakers)); + emitWhenConnected(ActiveSpeakersChangedEvent(speakers: activeSpeakers)); } void _onSignalConnectionQualityUpdateEvent( @@ -422,6 +433,11 @@ class Room extends DisposableChangeNotifier with EventsEmittable { if (trackPublication == null) continue; // update the stream state await trackPublication.updateStreamState(update.state.toLKType()); + emitWhenConnected(TrackStreamStateUpdatedEvent( + participant: participant, + publication: trackPublication, + streamState: update.state.toLKType(), + )); } } @@ -452,7 +468,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { await participant.unpublishAllTracks(notify: true); - events.emit(ParticipantDisconnectedEvent(participant: participant)); + emitWhenConnected(ParticipantDisconnectedEvent(participant: participant)); } Future _sendSyncState() async { @@ -512,6 +528,13 @@ extension RoomPrivateMethods on Room { _serverVersion = null; _serverRegion = null; } + + @internal + void emitWhenConnected(RoomEvent event) { + if (connectionState == ConnectionState.connected) { + events.emit(event); + } + } } extension RoomDebugMethods on Room { diff --git a/lib/src/participant/remote.dart b/lib/src/participant/remote.dart index aca4b31a0..5e33e2c41 100644 --- a/lib/src/participant/remote.dart +++ b/lib/src/participant/remote.dart @@ -138,7 +138,6 @@ class RemoteParticipant extends Participant { @internal Future updateFromInfo(lk_models.ParticipantInfo info) async { logger.fine('RemoteParticipant.updateFromInfo(info: $info)'); - final hadInfo = hasInfo; super.updateFromInfo(info); // figuring out deltas between tracks @@ -168,13 +167,13 @@ class RemoteParticipant extends Participant { } } - // notify listeners when it's not a new participant - if (hadInfo) { - for (final pub in newPubs) { - final event = TrackPublishedEvent( - participant: this, - publication: pub, - ); + // always emit events for new publications, Room will not forward them unless it's ready + for (final pub in newPubs) { + final event = TrackPublishedEvent( + participant: this, + publication: pub, + ); + if (room.connectionState == ConnectionState.connected) { [events, room.events].emit(event); } } diff --git a/lib/src/publication/remote.dart b/lib/src/publication/remote.dart index fccc33ed9..3c8f2de9a 100644 --- a/lib/src/publication/remote.dart +++ b/lib/src/publication/remote.dart @@ -69,7 +69,6 @@ class RemoteTrackPublication _streamState = streamState; [ participant.events, - participant.room.events, ].emit(TrackStreamStateUpdatedEvent( participant: participant, publication: this, @@ -304,7 +303,6 @@ class RemoteTrackPublication // emit events [ participant.events, - participant.room.events, ].emit(TrackSubscriptionPermissionChangedEvent( participant: participant, publication: this, diff --git a/test/core/room_e2e_test.dart b/test/core/room_e2e_test.dart index 73feab5d1..dcfa91eaf 100644 --- a/test/core/room_e2e_test.dart +++ b/test/core/room_e2e_test.dart @@ -36,10 +36,16 @@ void main() { test('participant join', () async { expect( room.events.streamCtrl.stream, - emits(predicate( - (event) => event.participant.sid == remoteParticipantData.sid, - )), + emitsInOrder([ + predicate( + (event) => event.participant.sid == remoteParticipantData.sid, + ), + predicate( + (event) => event.participant.sid == remoteParticipantData.sid, + ) + ]), ); + ws.onData(participantJoinResponse.writeToBuffer()); await room.events.waitFor(