Skip to content

Commit 40e5f4b

Browse files
committed
Add sid FfiRequest, streamline async sid()
1 parent f07fbb0 commit 40e5f4b

File tree

6 files changed

+108
-54
lines changed

6 files changed

+108
-54
lines changed

livekit-ffi/protocol/ffi.proto

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -67,17 +67,18 @@ message FfiRequest {
6767
UpdateLocalNameRequest update_local_name = 10;
6868
GetSessionStatsRequest get_session_stats = 11;
6969
PublishTranscriptionRequest publish_transcription = 12;
70+
GetRoomSidRequest get_room_sid = 13;
7071

7172
// Track
72-
CreateVideoTrackRequest create_video_track = 13;
73-
CreateAudioTrackRequest create_audio_track = 14;
74-
GetStatsRequest get_stats = 15;
73+
CreateVideoTrackRequest create_video_track = 14;
74+
CreateAudioTrackRequest create_audio_track = 15;
75+
GetStatsRequest get_stats = 16;
7576

7677
// Video
77-
NewVideoStreamRequest new_video_stream = 16;
78-
NewVideoSourceRequest new_video_source = 17;
79-
CaptureVideoFrameRequest capture_video_frame = 18;
80-
VideoConvertRequest video_convert = 19;
78+
NewVideoStreamRequest new_video_stream = 17;
79+
NewVideoSourceRequest new_video_source = 18;
80+
CaptureVideoFrameRequest capture_video_frame = 19;
81+
VideoConvertRequest video_convert = 20;
8182

8283
// Audio
8384
NewAudioStreamRequest new_audio_stream = 22;
@@ -105,17 +106,18 @@ message FfiResponse {
105106
UpdateLocalNameResponse update_local_name = 10;
106107
GetSessionStatsResponse get_session_stats = 11;
107108
PublishTranscriptionResponse publish_transcription = 12;
109+
GetRoomSidResponse get_room_sid = 13;
108110

109111
// Track
110-
CreateVideoTrackResponse create_video_track = 13;
111-
CreateAudioTrackResponse create_audio_track = 14;
112-
GetStatsResponse get_stats = 15;
112+
CreateVideoTrackResponse create_video_track = 14;
113+
CreateAudioTrackResponse create_audio_track = 15;
114+
GetStatsResponse get_stats = 16;
113115

114116
// Video
115-
NewVideoStreamResponse new_video_stream = 16;
116-
NewVideoSourceResponse new_video_source = 17;
117-
CaptureVideoFrameResponse capture_video_frame = 18;
118-
VideoConvertResponse video_convert = 19;
117+
NewVideoStreamResponse new_video_stream = 17;
118+
NewVideoSourceResponse new_video_source = 18;
119+
CaptureVideoFrameResponse capture_video_frame = 19;
120+
VideoConvertResponse video_convert = 20;
119121

120122
// Audio
121123
NewAudioStreamResponse new_audio_stream = 22;
@@ -150,6 +152,7 @@ message FfiEvent {
150152
LogBatch logs = 16;
151153
GetSessionStatsCallback get_session_stats = 17;
152154
Panic panic = 18;
155+
GetRoomSidCallback get_room_sid = 19;
153156
}
154157
}
155158

livekit-ffi/protocol/room.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,16 @@ message GetSessionStatsCallback {
161161
repeated RtcStats subscriber_stats = 4;
162162
}
163163

164+
message GetRoomSidRequest {
165+
uint64 room_handle = 1;
166+
}
167+
message GetRoomSidResponse {
168+
uint64 async_id = 1;
169+
}
170+
message GetRoomSidCallback {
171+
uint64 async_id = 1;
172+
string sid = 2;
173+
}
164174

165175
//
166176
// Options

livekit-ffi/src/conversion/room.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,9 +213,8 @@ impl From<proto::AudioEncoding> for AudioEncoding {
213213
}
214214

215215
impl From<&FfiRoom> for proto::RoomInfo {
216-
#[tokio::main]
217-
async fn from(value: &FfiRoom) -> Self {
216+
fn from(value: &FfiRoom) -> Self {
218217
let room = &value.inner.room;
219-
Self { sid: room.sid().await.into(), name: room.name(), metadata: room.metadata() }
218+
Self { sid: room.maybe_sid().unwrap_or_default().into(), name: room.name(), metadata: room.metadata() }
220219
}
221220
}

livekit-ffi/src/livekit.proto.rs

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2150,6 +2150,26 @@ pub struct GetSessionStatsCallback {
21502150
#[prost(message, repeated, tag="4")]
21512151
pub subscriber_stats: ::prost::alloc::vec::Vec<RtcStats>,
21522152
}
2153+
#[allow(clippy::derive_partial_eq_without_eq)]
2154+
#[derive(Clone, PartialEq, ::prost::Message)]
2155+
pub struct GetRoomSidRequest {
2156+
#[prost(uint64, tag="1")]
2157+
pub room_handle: u64,
2158+
}
2159+
#[allow(clippy::derive_partial_eq_without_eq)]
2160+
#[derive(Clone, PartialEq, ::prost::Message)]
2161+
pub struct GetRoomSidResponse {
2162+
#[prost(uint64, tag="1")]
2163+
pub async_id: u64,
2164+
}
2165+
#[allow(clippy::derive_partial_eq_without_eq)]
2166+
#[derive(Clone, PartialEq, ::prost::Message)]
2167+
pub struct GetRoomSidCallback {
2168+
#[prost(uint64, tag="1")]
2169+
pub async_id: u64,
2170+
#[prost(string, tag="2")]
2171+
pub sid: ::prost::alloc::string::String,
2172+
}
21532173
//
21542174
// Options
21552175
//
@@ -2967,7 +2987,7 @@ impl AudioSourceType {
29672987
#[allow(clippy::derive_partial_eq_without_eq)]
29682988
#[derive(Clone, PartialEq, ::prost::Message)]
29692989
pub struct FfiRequest {
2970-
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 22, 23, 24, 25, 26, 27")]
2990+
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 22, 23, 24, 25, 26, 27")]
29712991
pub message: ::core::option::Option<ffi_request::Message>,
29722992
}
29732993
/// Nested message and enum types in `FfiRequest`.
@@ -2998,21 +3018,23 @@ pub mod ffi_request {
29983018
GetSessionStats(super::GetSessionStatsRequest),
29993019
#[prost(message, tag="12")]
30003020
PublishTranscription(super::PublishTranscriptionRequest),
3001-
/// Track
30023021
#[prost(message, tag="13")]
3003-
CreateVideoTrack(super::CreateVideoTrackRequest),
3022+
GetRoomSid(super::GetRoomSidRequest),
3023+
/// Track
30043024
#[prost(message, tag="14")]
3005-
CreateAudioTrack(super::CreateAudioTrackRequest),
3025+
CreateVideoTrack(super::CreateVideoTrackRequest),
30063026
#[prost(message, tag="15")]
3027+
CreateAudioTrack(super::CreateAudioTrackRequest),
3028+
#[prost(message, tag="16")]
30073029
GetStats(super::GetStatsRequest),
30083030
/// Video
3009-
#[prost(message, tag="16")]
3010-
NewVideoStream(super::NewVideoStreamRequest),
30113031
#[prost(message, tag="17")]
3012-
NewVideoSource(super::NewVideoSourceRequest),
3032+
NewVideoStream(super::NewVideoStreamRequest),
30133033
#[prost(message, tag="18")]
3014-
CaptureVideoFrame(super::CaptureVideoFrameRequest),
3034+
NewVideoSource(super::NewVideoSourceRequest),
30153035
#[prost(message, tag="19")]
3036+
CaptureVideoFrame(super::CaptureVideoFrameRequest),
3037+
#[prost(message, tag="20")]
30163038
VideoConvert(super::VideoConvertRequest),
30173039
/// Audio
30183040
#[prost(message, tag="22")]
@@ -3033,7 +3055,7 @@ pub mod ffi_request {
30333055
#[allow(clippy::derive_partial_eq_without_eq)]
30343056
#[derive(Clone, PartialEq, ::prost::Message)]
30353057
pub struct FfiResponse {
3036-
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 22, 23, 24, 25, 26, 27")]
3058+
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 22, 23, 24, 25, 26, 27")]
30373059
pub message: ::core::option::Option<ffi_response::Message>,
30383060
}
30393061
/// Nested message and enum types in `FfiResponse`.
@@ -3064,21 +3086,23 @@ pub mod ffi_response {
30643086
GetSessionStats(super::GetSessionStatsResponse),
30653087
#[prost(message, tag="12")]
30663088
PublishTranscription(super::PublishTranscriptionResponse),
3067-
/// Track
30683089
#[prost(message, tag="13")]
3069-
CreateVideoTrack(super::CreateVideoTrackResponse),
3090+
GetRoomSid(super::GetRoomSidResponse),
3091+
/// Track
30703092
#[prost(message, tag="14")]
3071-
CreateAudioTrack(super::CreateAudioTrackResponse),
3093+
CreateVideoTrack(super::CreateVideoTrackResponse),
30723094
#[prost(message, tag="15")]
3095+
CreateAudioTrack(super::CreateAudioTrackResponse),
3096+
#[prost(message, tag="16")]
30733097
GetStats(super::GetStatsResponse),
30743098
/// Video
3075-
#[prost(message, tag="16")]
3076-
NewVideoStream(super::NewVideoStreamResponse),
30773099
#[prost(message, tag="17")]
3078-
NewVideoSource(super::NewVideoSourceResponse),
3100+
NewVideoStream(super::NewVideoStreamResponse),
30793101
#[prost(message, tag="18")]
3080-
CaptureVideoFrame(super::CaptureVideoFrameResponse),
3102+
NewVideoSource(super::NewVideoSourceResponse),
30813103
#[prost(message, tag="19")]
3104+
CaptureVideoFrame(super::CaptureVideoFrameResponse),
3105+
#[prost(message, tag="20")]
30823106
VideoConvert(super::VideoConvertResponse),
30833107
/// Audio
30843108
#[prost(message, tag="22")]
@@ -3101,7 +3125,7 @@ pub mod ffi_response {
31013125
#[allow(clippy::derive_partial_eq_without_eq)]
31023126
#[derive(Clone, PartialEq, ::prost::Message)]
31033127
pub struct FfiEvent {
3104-
#[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18")]
3128+
#[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19")]
31053129
pub message: ::core::option::Option<ffi_event::Message>,
31063130
}
31073131
/// Nested message and enum types in `FfiEvent`.
@@ -3145,6 +3169,8 @@ pub mod ffi_event {
31453169
GetSessionStats(super::GetSessionStatsCallback),
31463170
#[prost(message, tag="18")]
31473171
Panic(super::Panic),
3172+
#[prost(message, tag="19")]
3173+
GetRoomSid(super::GetRoomSidCallback),
31483174
}
31493175
}
31503176
/// Stop all rooms synchronously (Do we need async here?).

livekit-ffi/src/server/requests.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,27 @@ fn on_get_session_stats(
548548
Ok(proto::GetSessionStatsResponse { async_id })
549549
}
550550

551+
fn on_get_room_sid(
552+
server: &'static FfiServer,
553+
get_room_sid: proto::GetRoomSidRequest,
554+
) -> FfiResult<proto::GetRoomSidResponse> {
555+
let ffi_room = server.retrieve_handle::<room::FfiRoom>(get_room_sid.room_handle)?.clone();
556+
let async_id = server.next_id();
557+
558+
let handle = server.async_runtime.spawn(async move {
559+
let sid = ffi_room.inner.room.sid().await;
560+
let _ = server.send_event(proto::ffi_event::Message::GetRoomSid(
561+
proto::GetRoomSidCallback {
562+
async_id,
563+
sid: sid.into(),
564+
}
565+
));
566+
});
567+
568+
server.watch_panic(handle);
569+
Ok(proto::GetRoomSidResponse { async_id })
570+
}
571+
551572
#[allow(clippy::field_reassign_with_default)] // Avoid uggly format
552573
pub fn handle_request(
553574
server: &'static FfiServer,
@@ -636,6 +657,9 @@ pub fn handle_request(
636657
get_session_stats,
637658
)?)
638659
}
660+
proto::ffi_request::Message::GetRoomSid(get_room_sid) => {
661+
proto::ffi_response::Message::GetRoomSid(on_get_room_sid(server, get_room_sid)?)
662+
}
639663
});
640664

641665
Ok(res)

livekit/src/room/mod.rs

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ pub(crate) struct RoomSession {
264264
rtc_engine: Arc<RtcEngine>,
265265
sid_tx: AsyncMutex<Option<oneshot::Sender<()>>>,
266266
sid_rx: AsyncMutex<oneshot::Receiver<()>>,
267-
sid: RwLock<RoomSid>,
267+
sid: RwLock<Option<RoomSid>>,
268268
name: String,
269269
info: RwLock<RoomInfo>,
270270
dispatcher: Dispatcher<RoomEvent>,
@@ -408,8 +408,6 @@ impl Room {
408408
room_task: Default::default(),
409409
});
410410

411-
inner.set_sid(room_info.sid.try_into().unwrap());
412-
413411
e2ee_manager.on_state_changed({
414412
let dispatcher = dispatcher.clone();
415413
let inner = inner.clone();
@@ -485,6 +483,10 @@ impl Room {
485483

486484
pub async fn sid(&self) -> RoomSid {
487485
let _ = self.inner.sid_rx.lock().await;
486+
self.inner.sid.read().clone().unwrap()
487+
}
488+
489+
pub fn maybe_sid(&self) -> Option<RoomSid> {
488490
self.inner.sid.read().clone()
489491
}
490492

@@ -849,7 +851,13 @@ impl RoomSession {
849851
metadata: info.metadata.clone(),
850852
});
851853
}
852-
self.set_sid(room.sid.try_into().unwrap());
854+
if self.sid.read().is_none() && !room.sid.is_empty() {
855+
let mut sid = self.sid.write();
856+
*sid = Some(room.sid.try_into().unwrap());
857+
if let Ok(mut tx) = self.sid_tx.try_lock() {
858+
let _ = tx.take().unwrap().send(());
859+
}
860+
}
853861
}
854862

855863
fn handle_resuming(self: &Arc<Self>, tx: oneshot::Sender<()>) {
@@ -1159,22 +1167,6 @@ impl RoomSession {
11591167
) -> Option<RemoteParticipant> {
11601168
self.remote_participants.read().get(identity).cloned()
11611169
}
1162-
1163-
fn set_sid(&self, sid: RoomSid) {
1164-
let mut guard = self.sid_tx.blocking_lock();
1165-
let maybe_tx = guard.take();
1166-
if let Some(tx) = maybe_tx {
1167-
if !tx.is_closed() && sid != "".to_string().try_into().unwrap() {
1168-
let mut s = self.sid.write();
1169-
*s = sid;
1170-
let _ = tx.send(());
1171-
// self.sid_tx isn't replaced, remains None
1172-
} else {
1173-
// Sender isn't fired, gets put back in place
1174-
guard.replace(tx);
1175-
}
1176-
}
1177-
}
11781170
}
11791171

11801172
fn unpack_stream_id(stream_id: &str) -> Option<(&str, &str)> {

0 commit comments

Comments
 (0)