Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/livekit_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export 'src/managers/event.dart';
export 'src/options.dart';
export 'src/participant/local.dart';
export 'src/participant/participant.dart';
export 'src/participant/remote.dart';
export 'src/participant/remote.dart' hide ParticipantCreationResult;
export 'src/publication/local.dart';
export 'src/publication/remote.dart';
export 'src/publication/track_publication.dart';
Expand Down
44 changes: 31 additions & 13 deletions lib/src/core/room.dart
Original file line number Diff line number Diff line change
Expand Up @@ -636,15 +636,18 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
return null;
}

Future<RemoteParticipant> _getOrCreateRemoteParticipant(String identity, lk_models.ParticipantInfo? info) async {
Future<ParticipantCreationResult> _getOrCreateRemoteParticipant(
String identity, lk_models.ParticipantInfo? info) async {
RemoteParticipant? participant = _remoteParticipants[identity];
if (participant != null) {
if (info != null) {
await participant.updateFromInfo(info);
}
return participant;
// Return existing participant with no new publications; caller handles updates.
return ParticipantCreationResult(
participant: participant,
newPublications: const [],
);
}

ParticipantCreationResult result;
if (info == null) {
logger.warning('RemoteParticipant.info is null identity: $identity');
participant = RemoteParticipant(
Expand All @@ -653,16 +656,20 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
identity: identity,
name: '',
);
result = ParticipantCreationResult(
participant: participant,
newPublications: const [],
);
} else {
participant = await RemoteParticipant.createFromInfo(
result = await RemoteParticipant.createFromInfo(
room: this,
info: info,
);
}

_remoteParticipants[identity] = participant;
_sidToIdentity[participant.sid] = identity;
return participant;
_remoteParticipants[result.participant.identity] = result.participant;
_sidToIdentity[result.participant.sid] = result.participant.identity;
return result;
}

Future<void> _onParticipantUpdateEvent(List<lk_models.ParticipantInfo> updates) async {
Expand All @@ -689,14 +696,25 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
continue;
}

final participant = await _getOrCreateRemoteParticipant(info.identity, info);
final result = await _getOrCreateRemoteParticipant(info.identity, info);

if (isNew) {
hasChanged = true;
// fire connected event
emitWhenConnected(ParticipantConnectedEvent(participant: participant));
// Emit connected event
emitWhenConnected(ParticipantConnectedEvent(participant: result.participant));
// Emit TrackPublishedEvent for each new track
if (connectionState == ConnectionState.connected) {
for (final pub in result.newPublications) {
final event = TrackPublishedEvent(
participant: result.participant,
publication: pub,
);
[result.participant.events, events].emit(event);
}
}
_sidToIdentity[info.sid] = info.identity;
} else {
final wasUpdated = await participant.updateFromInfo(info);
final wasUpdated = await result.participant.updateFromInfo(info);
if (wasUpdated) {
_sidToIdentity[info.sid] = info.identity;
}
Expand Down
122 changes: 88 additions & 34 deletions lib/src/participant/remote.dart
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,28 @@ import '../track/remote/video.dart';
import '../types/other.dart';
import 'participant.dart';

/// Result of creating a RemoteParticipant with all its initial data populated.
///
/// This struct is returned by [RemoteParticipant.createFromInfo] and contains
/// the fully initialized participant along with the list of track publications
/// that were added during creation. The caller is responsible for emitting
/// events in the correct order.
@internal
class ParticipantCreationResult {
/// The fully initialized remote participant with all basic info and tracks populated.
final RemoteParticipant participant;

/// List of new track publications that were added during participant creation.
/// The caller should emit [TrackPublishedEvent] for each of these after
/// emitting [ParticipantConnectedEvent].
final List<RemoteTrackPublication> newPublications;

const ParticipantCreationResult({
required this.participant,
required this.newPublications,
});
}

/// Represents other participant in the [Room].
class RemoteParticipant extends Participant<RemoteTrackPublication> {
@internal
Expand All @@ -46,18 +68,70 @@ class RemoteParticipant extends Participant<RemoteTrackPublication> {
name: name,
);

static Future<RemoteParticipant> createFromInfo({
/// Creates a fully initialized RemoteParticipant without emitting events.
///
/// Populates the participant with all data from [info] including metadata, permissions,
/// and track publications. No events are emitted, allowing the caller to control event
/// timing and order.
///
/// Returns [ParticipantCreationResult] with the participant and new track publications.
/// The caller should emit [ParticipantConnectedEvent] first, then [TrackPublishedEvent]
/// for each track, ensuring the participant is fully populated when connected event fires.
///
/// @internal - Should only be called by [Room].
@internal
static Future<ParticipantCreationResult> createFromInfo({
required Room room,
required lk_models.ParticipantInfo info,
}) async {
final participant = RemoteParticipant(
room: room,
sid: info.identity,
sid: info.sid,
Comment on lines -55 to +89
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed this also.

identity: info.identity,
name: info.name,
);
await participant.updateFromInfo(info);
return participant;
// Update basic participant info (state, metadata, etc.)
await participant._updateBasicInfo(info);
// Add tracks to participant without emitting events
final newPubs = await participant._addTracks(info.tracks);
// Return result for caller to emit events in correct order
return ParticipantCreationResult(
participant: participant,
newPublications: newPubs,
);
}

Future<void> _updateBasicInfo(lk_models.ParticipantInfo info) async {
// Only call superclass updateFromInfo to update basic participant state
await super.updateFromInfo(info);
}

Future<List<RemoteTrackPublication>> _addTracks(List<lk_models.TrackInfo> tracks) async {
final newPubs = <RemoteTrackPublication>[];
for (final trackInfo in tracks) {
final RemoteTrackPublication? pub = getTrackPublicationBySid(trackInfo.sid);
if (pub == null) {
final RemoteTrackPublication pub;
if (trackInfo.type == lk_models.TrackType.VIDEO) {
pub = RemoteTrackPublication<RemoteVideoTrack>(
participant: this,
info: trackInfo,
);
} else if (trackInfo.type == lk_models.TrackType.AUDIO) {
pub = RemoteTrackPublication<RemoteAudioTrack>(
participant: this,
info: trackInfo,
);
} else {
throw UnexpectedStateException('Unknown track type');
}
newPubs.add(pub);
addTrackPublication(pub);
} else {
pub.updateFromInfo(trackInfo);
}
}
return newPubs;
}

/// A convenience property to get all video tracks.
Expand Down Expand Up @@ -189,34 +263,16 @@ class RemoteParticipant extends Participant<RemoteTrackPublication> {
//return false;
}

// figuring out deltas between tracks
final newPubs = <RemoteTrackPublication>{};
await _updateTracks(info.tracks);

for (final trackInfo in info.tracks) {
final RemoteTrackPublication? pub = getTrackPublicationBySid(trackInfo.sid);
if (pub == null) {
final RemoteTrackPublication pub;
if (trackInfo.type == lk_models.TrackType.VIDEO) {
pub = RemoteTrackPublication<RemoteVideoTrack>(
participant: this,
info: trackInfo,
);
} else if (trackInfo.type == lk_models.TrackType.AUDIO) {
pub = RemoteTrackPublication<RemoteAudioTrack>(
participant: this,
info: trackInfo,
);
} else {
throw UnexpectedStateException('Unknown track type');
}
newPubs.add(pub);
addTrackPublication(pub);
} else {
pub.updateFromInfo(trackInfo);
}
}
return true;
}

// always emit events for new publications, Room will not forward them unless it's ready
Future<void> _updateTracks(List<lk_models.TrackInfo> tracks) async {
// Add new tracks
final newPubs = await _addTracks(tracks);

// Emit events for new publications
for (final pub in newPubs) {
final event = TrackPublishedEvent(
participant: this,
Expand All @@ -227,14 +283,12 @@ class RemoteParticipant extends Participant<RemoteTrackPublication> {
}
}

// remove any published track that is not in the info
final validSids = info.tracks.map((e) => e.sid);
// Remove any published track that is not in the info
final validSids = tracks.map((e) => e.sid);
final removeSids = trackPublications.keys.where((e) => !validSids.contains(e)).toSet();
for (final sid in removeSids) {
await removePublishedTrack(sid);
}

return true;
}

Future<void> removePublishedTrack(String trackSid, {bool notify = true}) async {
Expand Down
31 changes: 31 additions & 0 deletions test/core/room_e2e_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,37 @@ void main() {
expect(room.remoteParticipants.length, 1);
});

test('participant join with tracks populated before connected event', () async {
// Track whether participant has tracks when connected event fires
bool participantHadTracksOnConnect = false;
int trackCountOnConnect = 0;

// Listen for ParticipantConnectedEvent
final cancel = room.events.on<ParticipantConnectedEvent>((event) {
// Verify participant is fully populated with tracks
trackCountOnConnect = event.participant.trackPublications.length;
participantHadTracksOnConnect = trackCountOnConnect > 0;
});

// Send participant join with tracks
ws.onData(participantJoinResponse.writeToBuffer());

// Wait for connected event
await room.events.waitFor<ParticipantConnectedEvent>(duration: const Duration(seconds: 1));

// Clean up listener
cancel();

// Verify participant had tracks when connected event was emitted
expect(participantHadTracksOnConnect, isTrue,
reason: 'Participant should have tracks when ParticipantConnectedEvent is emitted');
expect(trackCountOnConnect, greaterThan(0),
reason: 'Participant should have at least one track when connected event fires');

// Verify the participant is in the room
expect(room.remoteParticipants.length, 1);
});

test('participant disconnect', () async {
ws.onData(participantJoinResponse.writeToBuffer());
await room.events.waitFor<ParticipantConnectedEvent>(duration: const Duration(seconds: 1));
Expand Down