Skip to content

Commit 5c7a985

Browse files
committed
async room.sid(&self)
1 parent 0289aab commit 5c7a985

File tree

3 files changed

+43
-11
lines changed

3 files changed

+43
-11
lines changed

livekit-ffi/src/conversion/room.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,8 +213,9 @@ impl From<proto::AudioEncoding> for AudioEncoding {
213213
}
214214

215215
impl From<&FfiRoom> for proto::RoomInfo {
216-
fn from(value: &FfiRoom) -> Self {
216+
#[tokio::main]
217+
async fn from(value: &FfiRoom) -> Self {
217218
let room = &value.inner.room;
218-
Self { sid: room.sid().into(), name: room.name(), metadata: room.metadata() }
219+
Self { sid: room.sid().await.into(), name: room.name(), metadata: room.metadata() }
219220
}
220221
}

livekit/src/room/id.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ pub struct ParticipantIdentity(pub String);
1515
#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
1616
pub struct TrackSid(String);
1717

18-
#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
18+
#[derive(Clone, Default, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
1919
pub struct RoomSid(String);
2020

2121
impl From<String> for ParticipantIdentity {

livekit/src/room/mod.rs

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -246,9 +246,10 @@ pub struct Room {
246246
}
247247

248248
impl Debug for Room {
249-
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
249+
#[tokio::main]
250+
async fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
250251
f.debug_struct("Room")
251-
.field("sid", &self.sid())
252+
.field("sid", &self.sid().await)
252253
.field("name", &self.name())
253254
.field("connection_state", &self.connection_state())
254255
.finish()
@@ -262,7 +263,9 @@ struct RoomInfo {
262263

263264
pub(crate) struct RoomSession {
264265
rtc_engine: Arc<RtcEngine>,
265-
sid: RoomSid,
266+
sid_tx: AsyncMutex<Option<oneshot::Sender<()>>>,
267+
sid_rx: AsyncMutex<oneshot::Receiver<()>>,
268+
sid: RwLock<RoomSid>,
266269
name: String,
267270
info: RwLock<RoomInfo>,
268271
dispatcher: Dispatcher<RoomEvent>,
@@ -386,8 +389,11 @@ impl Room {
386389
});
387390

388391
let room_info = join_response.room.unwrap();
392+
let (send, recv) = oneshot::channel();
389393
let inner = Arc::new(RoomSession {
390-
sid: room_info.sid.try_into().unwrap(),
394+
sid_tx: AsyncMutex::new(Some(send)),
395+
sid_rx: AsyncMutex::new(recv),
396+
sid: Default::default(),
391397
name: room_info.name,
392398
info: RwLock::new(RoomInfo {
393399
state: ConnectionState::Disconnected,
@@ -403,6 +409,8 @@ impl Room {
403409
room_task: Default::default(),
404410
});
405411

412+
inner.set_sid(room_info.sid.try_into().unwrap());
413+
406414
e2ee_manager.on_state_changed({
407415
let dispatcher = dispatcher.clone();
408416
let inner = inner.clone();
@@ -476,8 +484,9 @@ impl Room {
476484
self.inner.dispatcher.register()
477485
}
478486

479-
pub fn sid(&self) -> RoomSid {
480-
self.inner.sid.clone()
487+
pub async fn sid(&self) -> RoomSid {
488+
let _ = self.inner.sid_rx.lock().await;
489+
self.inner.sid.read().clone()
481490
}
482491

483492
pub fn name(&self) -> String {
@@ -677,7 +686,12 @@ impl RoomSession {
677686
let participant_sid: ParticipantSid = participant_sid.to_owned().try_into().unwrap();
678687
let stream_id = stream_id.to_owned().try_into().unwrap();
679688

680-
let remote_participant = self.remote_participants.read().values().find(|x| { &x.sid() == &participant_sid }).cloned();
689+
let remote_participant = self
690+
.remote_participants
691+
.read()
692+
.values()
693+
.find(|x| &x.sid() == &participant_sid)
694+
.cloned();
681695

682696
if let Some(remote_participant) = remote_participant {
683697
livekit_runtime::spawn(async move {
@@ -836,6 +850,7 @@ impl RoomSession {
836850
metadata: info.metadata.clone(),
837851
});
838852
}
853+
self.set_sid(room.sid.try_into().unwrap());
839854
}
840855

841856
fn handle_resuming(self: &Arc<Self>, tx: oneshot::Sender<()>) {
@@ -1136,7 +1151,7 @@ impl RoomSession {
11361151
}
11371152

11381153
fn get_participant_by_sid(&self, sid: &ParticipantSid) -> Option<RemoteParticipant> {
1139-
self.remote_participants.read().values().find(|x| { &x.sid() == sid }).cloned()
1154+
self.remote_participants.read().values().find(|x| &x.sid() == sid).cloned()
11401155
}
11411156

11421157
fn get_participant_by_identity(
@@ -1145,6 +1160,22 @@ impl RoomSession {
11451160
) -> Option<RemoteParticipant> {
11461161
self.remote_participants.read().get(identity).cloned()
11471162
}
1163+
1164+
fn set_sid(&self, sid: RoomSid) {
1165+
let mut guard = self.sid_tx.blocking_lock();
1166+
let maybe_tx = guard.take();
1167+
if let Some(tx) = maybe_tx {
1168+
if !tx.is_closed() && sid != "".to_string().try_into().unwrap() {
1169+
let mut s = self.sid.write();
1170+
*s = sid;
1171+
let _ = tx.send(());
1172+
// self.sid_tx isn't replaced, remains None
1173+
} else {
1174+
// Sender isn't fired, gets put back in place
1175+
guard.replace(tx);
1176+
}
1177+
}
1178+
}
11481179
}
11491180

11501181
fn unpack_stream_id(stream_id: &str) -> Option<(&str, &str)> {

0 commit comments

Comments
 (0)