Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

peer_connection: Implement state changes after spec and fix race #598

Merged
merged 3 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions webrtc/src/peer_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -884,14 +884,21 @@ impl RTCPeerConnection {
// Any of the RTCIceTransports or RTCDtlsTransports are in the "disconnected"
// state and none of them are in the "failed" or "connecting" or "checking" state.
RTCPeerConnectionState::Disconnected
} else if ice_connection_state == RTCIceConnectionState::Connected && dtls_transport_state == RTCDtlsTransportState::Connected {
} else if (ice_connection_state == RTCIceConnectionState::New || ice_connection_state == RTCIceConnectionState::Closed) &&
(dtls_transport_state == RTCDtlsTransportState::New || dtls_transport_state == RTCDtlsTransportState::Closed) {
// None of the previous states apply and all RTCIceTransports are in the "new" or "closed" state,
// and all RTCDtlsTransports are in the "new" or "closed" state, or there are no transports.
RTCPeerConnectionState::New
} else if (ice_connection_state == RTCIceConnectionState::New || ice_connection_state == RTCIceConnectionState::Checking) ||
(dtls_transport_state == RTCDtlsTransportState::New || dtls_transport_state == RTCDtlsTransportState::Connecting) {
// None of the previous states apply and any RTCIceTransport is in the "new" or "checking" state or
// any RTCDtlsTransport is in the "new" or "connecting" state.
RTCPeerConnectionState::Connecting
} else if (ice_connection_state == RTCIceConnectionState::Connected || ice_connection_state == RTCIceConnectionState::Completed || ice_connection_state == RTCIceConnectionState::Closed) &&
(dtls_transport_state == RTCDtlsTransportState::Connected || dtls_transport_state == RTCDtlsTransportState::Closed) {
// All RTCIceTransports and RTCDtlsTransports are in the "connected", "completed" or "closed"
// state and at least one of them is in the "connected" or "completed" state.
// state and all RTCDtlsTransports are in the "connected" or "closed" state.
RTCPeerConnectionState::Connected
} else if ice_connection_state == RTCIceConnectionState::Checking && dtls_transport_state == RTCDtlsTransportState::Connecting {
// Any of the RTCIceTransports or RTCDtlsTransports are in the "connecting" or
// "checking" state and none of them is in the "failed" state.
RTCPeerConnectionState::Connecting
} else {
RTCPeerConnectionState::New
};
Expand Down
160 changes: 79 additions & 81 deletions webrtc/src/peer_connection/peer_connection_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,24 @@ impl PeerConnectionInternal {
stats_interceptor: Arc<stats::StatsInterceptor>,
mut configuration: RTCConfiguration,
) -> Result<(Arc<Self>, RTCConfiguration)> {
let mut pc = PeerConnectionInternal {
// Create the ice gatherer
let ice_gatherer = Arc::new(api.new_ice_gatherer(RTCIceGatherOptions {
ice_servers: configuration.get_ice_servers(),
ice_gather_policy: configuration.ice_transport_policy,
})?);

// Create the ICE transport
let ice_transport = Arc::new(api.new_ice_transport(Arc::clone(&ice_gatherer)));

// Create the DTLS transport
let certificates = configuration.certificates.drain(..).collect();
let dtls_transport =
Arc::new(api.new_dtls_transport(Arc::clone(&ice_transport), certificates)?);

// Create the SCTP transport
let sctp_transport = Arc::new(api.new_sctp_transport(Arc::clone(&dtls_transport))?);

let pc = Arc::new(PeerConnectionInternal {
greater_mid: AtomicIsize::new(-1),
sdp_origin: Mutex::new(Default::default()),
last_offer: Mutex::new("".to_owned()),
Expand All @@ -89,16 +106,16 @@ impl PeerConnectionInternal {
is_negotiation_needed: Arc::new(AtomicBool::new(false)),
negotiation_needed_state: Arc::new(AtomicU8::new(NegotiationNeededState::Empty as u8)),
signaling_state: Arc::new(AtomicU8::new(RTCSignalingState::Stable as u8)),
ice_transport: Arc::new(Default::default()),
dtls_transport: Arc::new(Default::default()),
ice_transport,
dtls_transport,
ice_connection_state: Arc::new(AtomicU8::new(RTCIceConnectionState::New as u8)),
sctp_transport: Arc::new(Default::default()),
sctp_transport,
rtp_transceivers: Arc::new(Default::default()),
on_track_handler: Arc::new(ArcSwapOption::empty()),
on_signaling_state_change_handler: ArcSwapOption::empty(),
on_ice_connection_state_change_handler: Arc::new(ArcSwapOption::empty()),
on_data_channel_handler: Arc::new(Default::default()),
ice_gatherer: Arc::new(Default::default()),
ice_gatherer,
current_local_description: Arc::new(Default::default()),
current_remote_description: Arc::new(Default::default()),
pending_local_description: Arc::new(Default::default()),
Expand All @@ -114,39 +131,77 @@ impl PeerConnectionInternal {
stats_interceptor,
on_peer_connection_state_change_handler: Arc::new(ArcSwapOption::empty()),
pending_remote_description: Arc::new(Default::default()),
};

// Create the ice gatherer
pc.ice_gatherer = Arc::new(api.new_ice_gatherer(RTCIceGatherOptions {
ice_servers: configuration.get_ice_servers(),
ice_gather_policy: configuration.ice_transport_policy,
})?);

// Create the ice transport
pc.ice_transport = pc.create_ice_transport(api).await;
});

// Create the DTLS transport
let certificates = configuration.certificates.drain(..).collect();
pc.dtls_transport =
Arc::new(api.new_dtls_transport(Arc::clone(&pc.ice_transport), certificates)?);
// Wire up the ice transport connection state change handler
let ice_connection_state = Arc::clone(&pc.ice_connection_state);
let peer_connection_state = Arc::clone(&pc.peer_connection_state);
let is_closed = Arc::clone(&pc.is_closed);
let dtls_transport = Arc::clone(&pc.dtls_transport);
let on_ice_connection_state_change_handler =
Arc::clone(&pc.on_ice_connection_state_change_handler);
let on_peer_connection_state_change_handler =
Arc::clone(&pc.on_peer_connection_state_change_handler);

pc.ice_transport.on_connection_state_change(Box::new(
move |state: RTCIceTransportState| {
let cs = match state {
RTCIceTransportState::New => RTCIceConnectionState::New,
RTCIceTransportState::Checking => RTCIceConnectionState::Checking,
RTCIceTransportState::Connected => RTCIceConnectionState::Connected,
RTCIceTransportState::Completed => RTCIceConnectionState::Completed,
RTCIceTransportState::Failed => RTCIceConnectionState::Failed,
RTCIceTransportState::Disconnected => RTCIceConnectionState::Disconnected,
RTCIceTransportState::Closed => RTCIceConnectionState::Closed,
_ => {
log::warn!("on_connection_state_change: unhandled ICE state: {}", state);
return Box::pin(async {});
}
};

// Create the SCTP transport
pc.sctp_transport = Arc::new(api.new_sctp_transport(Arc::clone(&pc.dtls_transport))?);
let dtls_transport = Arc::clone(&dtls_transport);
let ice_connection_state = Arc::clone(&ice_connection_state);
let on_ice_connection_state_change_handler =
Arc::clone(&on_ice_connection_state_change_handler);
let on_peer_connection_state_change_handler =
Arc::clone(&on_peer_connection_state_change_handler);
let is_closed = Arc::clone(&is_closed);
let peer_connection_state = Arc::clone(&peer_connection_state);
Box::pin(async move {
RTCPeerConnection::do_ice_connection_state_change(
&on_ice_connection_state_change_handler,
&ice_connection_state,
cs,
)
.await;

let dtls_transport_state = dtls_transport.state();
RTCPeerConnection::update_connection_state(
&on_peer_connection_state_change_handler,
&is_closed,
&peer_connection_state,
cs,
dtls_transport_state,
)
.await;
})
},
));

// Wire up the on datachannel handler
let on_data_channel_handler = Arc::clone(&pc.on_data_channel_handler);
pc.sctp_transport
.on_data_channel(Box::new(move |d: Arc<RTCDataChannel>| {
let on_data_channel_handler2 = Arc::clone(&on_data_channel_handler);
let on_data_channel_handler = Arc::clone(&on_data_channel_handler);
Box::pin(async move {
if let Some(handler) = &*on_data_channel_handler2.load() {
if let Some(handler) = &*on_data_channel_handler.load() {
let mut f = handler.lock().await;
f(d).await;
}
})
}));

Ok((Arc::new(pc), configuration))
Ok((pc, configuration))
}

pub(super) async fn start_rtp(
Expand Down Expand Up @@ -1141,63 +1196,6 @@ impl PeerConnectionInternal {
}
}

pub(super) async fn create_ice_transport(&self, api: &API) -> Arc<RTCIceTransport> {
let ice_transport = Arc::new(api.new_ice_transport(Arc::clone(&self.ice_gatherer)));

let ice_connection_state = Arc::clone(&self.ice_connection_state);
let peer_connection_state = Arc::clone(&self.peer_connection_state);
let is_closed = Arc::clone(&self.is_closed);
let dtls_transport = Arc::clone(&self.dtls_transport);
let on_ice_connection_state_change_handler =
Arc::clone(&self.on_ice_connection_state_change_handler);
let on_peer_connection_state_change_handler =
Arc::clone(&self.on_peer_connection_state_change_handler);

ice_transport.on_connection_state_change(Box::new(move |state: RTCIceTransportState| {
let cs = match state {
RTCIceTransportState::New => RTCIceConnectionState::New,
RTCIceTransportState::Checking => RTCIceConnectionState::Checking,
RTCIceTransportState::Connected => RTCIceConnectionState::Connected,
RTCIceTransportState::Completed => RTCIceConnectionState::Completed,
RTCIceTransportState::Failed => RTCIceConnectionState::Failed,
RTCIceTransportState::Disconnected => RTCIceConnectionState::Disconnected,
RTCIceTransportState::Closed => RTCIceConnectionState::Closed,
_ => {
log::warn!("on_connection_state_change: unhandled ICE state: {}", state);
return Box::pin(async {});
}
};

let ice_connection_state2 = Arc::clone(&ice_connection_state);
let on_ice_connection_state_change_handler2 =
Arc::clone(&on_ice_connection_state_change_handler);
let on_peer_connection_state_change_handler2 =
Arc::clone(&on_peer_connection_state_change_handler);
let is_closed2 = Arc::clone(&is_closed);
let dtls_transport_state = dtls_transport.state();
let peer_connection_state2 = Arc::clone(&peer_connection_state);
Box::pin(async move {
RTCPeerConnection::do_ice_connection_state_change(
&on_ice_connection_state_change_handler2,
&ice_connection_state2,
cs,
)
.await;

RTCPeerConnection::update_connection_state(
&on_peer_connection_state_change_handler2,
&is_closed2,
&peer_connection_state2,
cs,
dtls_transport_state,
)
.await;
})
}));

ice_transport
}

/// has_local_description_changed returns whether local media (rtp_transceivers) has changed
/// caller of this method should hold `pc.mu` lock
pub(super) async fn has_local_description_changed(&self, desc: &RTCSessionDescription) -> bool {
Expand Down
105 changes: 105 additions & 0 deletions webrtc/src/peer_connection/peer_connection_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,3 +635,108 @@ async fn test_peer_connection_simulcast_no_data_channel() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_peer_connection_state() -> Result<()> {
let mut m = MediaEngine::default();
m.register_default_codecs()?;
let api = APIBuilder::new().with_media_engine(m).build();
let pc = api.new_peer_connection(RTCConfiguration::default()).await?;

assert_eq!(pc.connection_state(), RTCPeerConnectionState::New);

RTCPeerConnection::update_connection_state(
&pc.internal.on_peer_connection_state_change_handler,
&pc.internal.is_closed,
&pc.internal.peer_connection_state,
RTCIceConnectionState::Checking,
RTCDtlsTransportState::New,
)
.await;
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Connecting);

RTCPeerConnection::update_connection_state(
&pc.internal.on_peer_connection_state_change_handler,
&pc.internal.is_closed,
&pc.internal.peer_connection_state,
RTCIceConnectionState::Connected,
RTCDtlsTransportState::New,
)
.await;
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Connecting);

RTCPeerConnection::update_connection_state(
&pc.internal.on_peer_connection_state_change_handler,
&pc.internal.is_closed,
&pc.internal.peer_connection_state,
RTCIceConnectionState::Connected,
RTCDtlsTransportState::Connecting,
)
.await;
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Connecting);

RTCPeerConnection::update_connection_state(
&pc.internal.on_peer_connection_state_change_handler,
&pc.internal.is_closed,
&pc.internal.peer_connection_state,
RTCIceConnectionState::Connected,
RTCDtlsTransportState::Connected,
)
.await;
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Connected);

RTCPeerConnection::update_connection_state(
&pc.internal.on_peer_connection_state_change_handler,
&pc.internal.is_closed,
&pc.internal.peer_connection_state,
RTCIceConnectionState::Completed,
RTCDtlsTransportState::Connected,
)
.await;
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Connected);

RTCPeerConnection::update_connection_state(
&pc.internal.on_peer_connection_state_change_handler,
&pc.internal.is_closed,
&pc.internal.peer_connection_state,
RTCIceConnectionState::Connected,
RTCDtlsTransportState::Closed,
)
.await;
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Connected);

RTCPeerConnection::update_connection_state(
&pc.internal.on_peer_connection_state_change_handler,
&pc.internal.is_closed,
&pc.internal.peer_connection_state,
RTCIceConnectionState::Disconnected,
RTCDtlsTransportState::Connected,
)
.await;
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Disconnected);

RTCPeerConnection::update_connection_state(
&pc.internal.on_peer_connection_state_change_handler,
&pc.internal.is_closed,
&pc.internal.peer_connection_state,
RTCIceConnectionState::Failed,
RTCDtlsTransportState::Connected,
)
.await;
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Failed);

RTCPeerConnection::update_connection_state(
&pc.internal.on_peer_connection_state_change_handler,
&pc.internal.is_closed,
&pc.internal.peer_connection_state,
RTCIceConnectionState::Connected,
RTCDtlsTransportState::Failed,
)
.await;
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Failed);

pc.close().await?;
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Closed);

Ok(())
}
Loading