Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
livekit: patch
livekit-api: patch
livekit-ffi: patch
---

Send publisher offer with join request to accelerate connection - #996 (@cnderrauber)
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions livekit-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ signal-client-tokio = [
"dep:reqwest",
"dep:livekit-runtime",
"livekit-runtime/tokio",
"dep:base64"
"dep:base64",
"dep:flate2"
]

signal-client-async = [
Expand All @@ -36,7 +37,8 @@ __signal-client-async-compatible = [
"dep:futures-util",
"dep:isahc",
"dep:livekit-runtime",
"dep:base64"
"dep:base64",
"dep:flate2"
]


Expand Down Expand Up @@ -101,6 +103,7 @@ http = "1.1"
reqwest = { version = "0.12", default-features = false, features = [ "json" ], optional = true }
isahc = { version = "1.7.2", default-features = false, features = [ "json", "text-decoding" ], optional = true }

flate2 = { version = "1", optional = true }
scopeguard = "1.2.0"
rand = { workspace = true }
os_info = "3.14.0"
Expand Down
81 changes: 62 additions & 19 deletions livekit-api/src/signal_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
use std::{
borrow::Cow,
fmt::Debug,
io::Write,
sync::{
atomic::{AtomicBool, AtomicU32, Ordering},
Arc,
},
time::{Duration, SystemTime, UNIX_EPOCH},
};

use base64::{engine::general_purpose::STANDARD as BASE64_STANDARD, Engine};
use base64::{engine::general_purpose::URL_SAFE as BASE64_URL_SAFE, Engine};
use flate2::{write::GzEncoder, Compression};
use http::StatusCode;
use livekit_protocol as proto;
use livekit_runtime::{interval, sleep, Instant, JoinHandle};
Expand Down Expand Up @@ -157,6 +159,7 @@ impl SignalClient {
url: &str,
token: &str,
options: SignalOptions,
publisher_offer: Option<proto::SessionDescription>,
) -> SignalResult<(Self, proto::JoinResponse, SignalEvents)> {
let handle_success = |inner: Arc<SignalInner>, join_response, stream_events| {
let (emitter, events) = mpsc::unbounded_channel();
Expand All @@ -166,7 +169,7 @@ impl SignalClient {
(Self { inner, emitter, handle: Mutex::new(Some(signal_task)) }, join_response, events)
};

match SignalInner::connect(url, token, options.clone()).await {
match SignalInner::connect(url, token, options.clone(), publisher_offer.clone()).await {
Ok((inner, join_response, stream_events)) => {
Ok(handle_success(inner, join_response, stream_events))
}
Expand All @@ -180,7 +183,9 @@ impl SignalClient {

for url in urls.iter() {
log::info!("fallback connection to: {}", url);
match SignalInner::connect(url, token, options.clone()).await {
match SignalInner::connect(url, token, options.clone(), publisher_offer.clone())
.await
{
Ok((inner, join_response, stream_events)) => {
return Ok(handle_success(inner, join_response, stream_events))
}
Expand Down Expand Up @@ -263,6 +268,7 @@ impl SignalInner {
url: &str,
token: &str,
options: SignalOptions,
publisher_offer: Option<proto::SessionDescription>,
) -> SignalResult<(
Arc<Self>,
proto::JoinResponse,
Expand All @@ -271,7 +277,8 @@ impl SignalInner {
// Try v1 path first if single_peer_connection is enabled
let use_v1_path = options.single_peer_connection;
// For initial connection: reconnect=false, reconnect_reason=None, participant_sid=""
let lk_url = get_livekit_url(url, &options, use_v1_path, false, None, "")?;
let lk_url =
get_livekit_url(url, &options, use_v1_path, false, None, "", publisher_offer.as_ref())?;
// Try to connect to the SignalClient
let (stream, mut events, single_pc_mode_active) =
match SignalStream::connect(lk_url.clone(), token, options.connect_timeout).await {
Expand Down Expand Up @@ -301,7 +308,8 @@ impl SignalInner {
matches!(&err, SignalError::WsError(WsError::Http(e)) if e.status() == 404);

if use_v1_path && is_not_found {
let lk_url_v0 = get_livekit_url(url, &options, false, false, None, "")?;
let lk_url_v0 =
get_livekit_url(url, &options, false, false, None, "", None)?;
log::warn!("v1 path not found (404), falling back to v0 path");
match SignalStream::connect(
lk_url_v0.clone(),
Expand Down Expand Up @@ -397,9 +405,17 @@ impl SignalInner {
// For reconnects: reconnect=true, participant_sid=sid
// For v1 path: reconnect and sid are encoded in the join_request protobuf
// For v0 path: reconnect and sid are added as separate query parameters
let lk_url =
get_livekit_url(&self.url, &self.options, self.single_pc_mode_active, true, None, sid)
.unwrap();
// No publisher offer for reconnections
let lk_url = get_livekit_url(
&self.url,
&self.options,
self.single_pc_mode_active,
true,
None,
sid,
None,
)
.unwrap();

let (new_stream, mut events) =
SignalStream::connect(lk_url, &token, self.options.connect_timeout).await?;
Expand Down Expand Up @@ -556,6 +572,7 @@ fn create_join_request_param(
participant_sid: &str,
os: String,
os_version: String,
publisher_offer: Option<&proto::SessionDescription>,
) -> String {
let connection_settings = proto::ConnectionSettings {
auto_subscribe: options.auto_subscribe,
Expand All @@ -576,6 +593,7 @@ fn create_join_request_param(
client_info: Some(client_info),
connection_settings: Some(connection_settings),
reconnect,
publisher_offer: publisher_offer.cloned(),
..Default::default()
};

Expand All @@ -592,13 +610,29 @@ fn create_join_request_param(
// Serialize JoinRequest to bytes
let join_request_bytes = join_request.encode_to_vec();

// Create WrappedJoinRequest (JS doesn't explicitly set compression, so default is NONE)
// Use gzip compression when publisher offer is included (SDP makes payload large)
let (compressed_bytes, compression) = if publisher_offer.is_some() {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
if encoder.write_all(&join_request_bytes).is_ok() {
if let Ok(compressed) = encoder.finish() {
(compressed, proto::wrapped_join_request::Compression::Gzip as i32)
} else {
(join_request_bytes, proto::wrapped_join_request::Compression::None as i32)
}
} else {
(join_request_bytes, proto::wrapped_join_request::Compression::None as i32)
}
} else {
(join_request_bytes, proto::wrapped_join_request::Compression::None as i32)
};

let wrapped_join_request =
proto::WrappedJoinRequest { join_request: join_request_bytes, ..Default::default() };
proto::WrappedJoinRequest { join_request: compressed_bytes, compression };

// Serialize WrappedJoinRequest to bytes and base64 encode
// Serialize WrappedJoinRequest to bytes and base64url encode
// (URL-safe base64 avoids percent-encoding issues in query parameters)
let wrapped_bytes = wrapped_join_request.encode_to_vec();
BASE64_STANDARD.encode(&wrapped_bytes)
BASE64_URL_SAFE.encode(&wrapped_bytes)
}

/// Build the LiveKit WebSocket URL for connection
Expand All @@ -617,6 +651,7 @@ fn get_livekit_url(
reconnect: bool,
reconnect_reason: Option<i32>,
participant_sid: &str,
publisher_offer: Option<&proto::SessionDescription>,
) -> SignalResult<url::Url> {
let mut lk_url = url::Url::parse(url).map_err(|err| SignalError::UrlParse(err.to_string()))?;

Expand Down Expand Up @@ -651,6 +686,7 @@ fn get_livekit_url(
participant_sid,
os_info.os_type().to_string(),
os_info.version().to_string(),
publisher_offer,
);
lk_url.query_pairs_mut().append_pair("join_request", &join_request_param);
} else {
Expand Down Expand Up @@ -754,32 +790,39 @@ mod tests {
fn livekit_url_test() {
let io = SignalOptions::default();

assert!(get_livekit_url("localhost:7880", &io, false, false, None, "").is_err());
assert!(get_livekit_url("localhost:7880", &io, false, false, None, "", None).is_err());
assert_eq!(
get_livekit_url("https://localhost:7880", &io, false, false, None, "")
get_livekit_url("https://localhost:7880", &io, false, false, None, "", None)
.unwrap()
.scheme(),
"wss"
);
assert_eq!(
get_livekit_url("http://localhost:7880", &io, false, false, None, "").unwrap().scheme(),
get_livekit_url("http://localhost:7880", &io, false, false, None, "", None)
.unwrap()
.scheme(),
"ws"
);
assert_eq!(
get_livekit_url("wss://localhost:7880", &io, false, false, None, "").unwrap().scheme(),
get_livekit_url("wss://localhost:7880", &io, false, false, None, "", None)
.unwrap()
.scheme(),
"wss"
);
assert_eq!(
get_livekit_url("ws://localhost:7880", &io, false, false, None, "").unwrap().scheme(),
get_livekit_url("ws://localhost:7880", &io, false, false, None, "", None)
.unwrap()
.scheme(),
"ws"
);
assert!(get_livekit_url("ftp://localhost:7880", &io, false, false, None, "").is_err());
assert!(get_livekit_url("ftp://localhost:7880", &io, false, false, None, "", None).is_err());
}

#[test]
fn validate_url_test() {
let io = SignalOptions::default();
let lk_url = get_livekit_url("wss://localhost:7880", &io, false, false, None, "").unwrap();
let lk_url =
get_livekit_url("wss://localhost:7880", &io, false, false, None, "", None).unwrap();
let validate_url = get_validate_url(lk_url);

// Should be /rtc/validate, not /rtc/rtc/validate
Expand Down
3 changes: 2 additions & 1 deletion livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1171,7 +1171,8 @@ impl RoomSession {
let stream_id = stream.id();
let lk_stream_id = unpack_stream_id(&stream_id);
if lk_stream_id.is_none() {
log::error!("received track with an invalid track_id: {:?}", &stream_id);
// server could require extra media sections to accelerate subscription.
log::debug!("received track with an invalid track_id: {:?}", &stream_id);
return;
}

Expand Down
39 changes: 39 additions & 0 deletions livekit/src/rtc_engine/peer_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct TransportInner {
single_pc_mode: bool,
// Publish-side target bitrate (bps) for offer munging
max_send_bitrate_bps: Option<u64>,
pending_initial_offer: Option<SessionDescription>,
}

pub struct PeerTransport {
Expand Down Expand Up @@ -64,6 +65,7 @@ impl PeerTransport {
restarting_ice: false,
single_pc_mode,
max_send_bitrate_bps: None,
pending_initial_offer: None,
})),
}
}
Expand Down Expand Up @@ -108,6 +110,10 @@ impl PeerTransport {
) -> EngineResult<()> {
let mut inner = self.inner.lock().await;

if let Some(pending_offer) = inner.pending_initial_offer.take() {
self.peer_connection.set_local_description(pending_offer).await?;
}

self.peer_connection.set_remote_description(remote_description).await?;

for ic in inner.pending_candidates.drain(..) {
Expand Down Expand Up @@ -136,6 +142,34 @@ impl PeerTransport {
Ok(answer)
}

/// Create an initial offer without setting it as local description.
/// The offer is stored as pending and will be applied when the server's answer arrives.
pub async fn create_initial_offer(&self) -> EngineResult<Option<SessionDescription>> {
let mut inner = self.inner.lock().await;
if !inner.single_pc_mode {
return Ok(None);
}

let offer = self.peer_connection.create_offer(OfferOptions::default()).await?;
let sdp = offer.to_string();

let recvonly_munged = Self::munge_inactive_to_recvonly_for_media(&sdp);
if recvonly_munged != sdp {
if let Ok(parsed) = SessionDescription::parse(&recvonly_munged, offer.sdp_type()) {
inner.pending_initial_offer = Some(parsed.clone());
return Ok(Some(parsed));
}
}

inner.pending_initial_offer = Some(offer.clone());
Ok(Some(offer))
}

pub async fn clear_pending_initial_offer(&self) {
let mut inner = self.inner.lock().await;
inner.pending_initial_offer = None;
}

pub async fn set_max_send_bitrate_bps(&self, bps: Option<u64>) {
let mut inner = self.inner.lock().await;
inner.max_send_bitrate_bps = bps;
Expand Down Expand Up @@ -333,6 +367,11 @@ impl PeerTransport {
inner.restarting_ice = true;
}

if inner.pending_initial_offer.is_some() {
inner.renegotiate = true;
return Ok(());
}

if self.peer_connection.signaling_state() == SignalingState::HaveLocalOffer {
let remote_sdp = self.peer_connection.current_remote_description();
if options.ice_restart && remote_sdp.is_some() {
Expand Down
Loading
Loading