Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

peer_connection: Implement state changes after spec and fix race #598

Merged
merged 3 commits into from
Aug 2, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
peer_connection: Initialize PeerConnectionInternal correctly
PeerConnectionInternal was initialized with default transports and then
mutated right after. This resulted in create_ice_transport() using the
default DTLS transport instance which again led to state transition
reading DTLS transport state from the wrong instance.
  • Loading branch information
haaspors committed Aug 2, 2024
commit 974e9ec3e9919e235abe7b152914fc03db535da8
161 changes: 79 additions & 82 deletions webrtc/src/peer_connection/peer_connection_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,24 @@ impl PeerConnectionInternal {
stats_interceptor: Arc<stats::StatsInterceptor>,
mut configuration: RTCConfiguration,
) -> Result<(Arc<Self>, RTCConfiguration)> {
let mut pc = PeerConnectionInternal {
// Create the ice gatherer
let ice_gatherer = Arc::new(api.new_ice_gatherer(RTCIceGatherOptions {
ice_servers: configuration.get_ice_servers(),
ice_gather_policy: configuration.ice_transport_policy,
})?);

// Create the ICE transport
let ice_transport = Arc::new(api.new_ice_transport(Arc::clone(&ice_gatherer)));

// Create the DTLS transport
let certificates = configuration.certificates.drain(..).collect();
let dtls_transport =
Arc::new(api.new_dtls_transport(Arc::clone(&ice_transport), certificates)?);

// Create the SCTP transport
let sctp_transport = Arc::new(api.new_sctp_transport(Arc::clone(&dtls_transport))?);

let pc = Arc::new(PeerConnectionInternal {
greater_mid: AtomicIsize::new(-1),
sdp_origin: Mutex::new(Default::default()),
last_offer: Mutex::new("".to_owned()),
Expand All @@ -89,16 +106,16 @@ impl PeerConnectionInternal {
is_negotiation_needed: Arc::new(AtomicBool::new(false)),
negotiation_needed_state: Arc::new(AtomicU8::new(NegotiationNeededState::Empty as u8)),
signaling_state: Arc::new(AtomicU8::new(RTCSignalingState::Stable as u8)),
ice_transport: Arc::new(Default::default()),
dtls_transport: Arc::new(Default::default()),
ice_transport,
dtls_transport,
ice_connection_state: Arc::new(AtomicU8::new(RTCIceConnectionState::New as u8)),
sctp_transport: Arc::new(Default::default()),
sctp_transport,
rtp_transceivers: Arc::new(Default::default()),
on_track_handler: Arc::new(ArcSwapOption::empty()),
on_signaling_state_change_handler: ArcSwapOption::empty(),
on_ice_connection_state_change_handler: Arc::new(ArcSwapOption::empty()),
on_data_channel_handler: Arc::new(Default::default()),
ice_gatherer: Arc::new(Default::default()),
ice_gatherer,
current_local_description: Arc::new(Default::default()),
current_remote_description: Arc::new(Default::default()),
pending_local_description: Arc::new(Default::default()),
Expand All @@ -114,39 +131,77 @@ impl PeerConnectionInternal {
stats_interceptor,
on_peer_connection_state_change_handler: Arc::new(ArcSwapOption::empty()),
pending_remote_description: Arc::new(Default::default()),
};

// Create the ice gatherer
pc.ice_gatherer = Arc::new(api.new_ice_gatherer(RTCIceGatherOptions {
ice_servers: configuration.get_ice_servers(),
ice_gather_policy: configuration.ice_transport_policy,
})?);

// Create the ice transport
pc.ice_transport = pc.create_ice_transport(api).await;
});

// Create the DTLS transport
let certificates = configuration.certificates.drain(..).collect();
pc.dtls_transport =
Arc::new(api.new_dtls_transport(Arc::clone(&pc.ice_transport), certificates)?);
// Wire up the ice transport connection state change handler
let ice_connection_state = Arc::clone(&pc.ice_connection_state);
let peer_connection_state = Arc::clone(&pc.peer_connection_state);
let is_closed = Arc::clone(&pc.is_closed);
let dtls_transport = Arc::clone(&pc.dtls_transport);
let on_ice_connection_state_change_handler =
Arc::clone(&pc.on_ice_connection_state_change_handler);
let on_peer_connection_state_change_handler =
Arc::clone(&pc.on_peer_connection_state_change_handler);

pc.ice_transport.on_connection_state_change(Box::new(
move |state: RTCIceTransportState| {
let cs = match state {
RTCIceTransportState::New => RTCIceConnectionState::New,
RTCIceTransportState::Checking => RTCIceConnectionState::Checking,
RTCIceTransportState::Connected => RTCIceConnectionState::Connected,
RTCIceTransportState::Completed => RTCIceConnectionState::Completed,
RTCIceTransportState::Failed => RTCIceConnectionState::Failed,
RTCIceTransportState::Disconnected => RTCIceConnectionState::Disconnected,
RTCIceTransportState::Closed => RTCIceConnectionState::Closed,
_ => {
log::warn!("on_connection_state_change: unhandled ICE state: {}", state);
return Box::pin(async {});
}
};

// Create the SCTP transport
pc.sctp_transport = Arc::new(api.new_sctp_transport(Arc::clone(&pc.dtls_transport))?);
let dtls_transport = Arc::clone(&dtls_transport);
let ice_connection_state = Arc::clone(&ice_connection_state);
let on_ice_connection_state_change_handler =
Arc::clone(&on_ice_connection_state_change_handler);
let on_peer_connection_state_change_handler =
Arc::clone(&on_peer_connection_state_change_handler);
let is_closed = Arc::clone(&is_closed);
let peer_connection_state = Arc::clone(&peer_connection_state);
Box::pin(async move {
RTCPeerConnection::do_ice_connection_state_change(
&on_ice_connection_state_change_handler,
&ice_connection_state,
cs,
)
.await;

let dtls_transport_state = dtls_transport.state();
RTCPeerConnection::update_connection_state(
&on_peer_connection_state_change_handler,
&is_closed,
&peer_connection_state,
cs,
dtls_transport_state,
)
.await;
})
},
));

// Wire up the on datachannel handler
let on_data_channel_handler = Arc::clone(&pc.on_data_channel_handler);
pc.sctp_transport
.on_data_channel(Box::new(move |d: Arc<RTCDataChannel>| {
let on_data_channel_handler2 = Arc::clone(&on_data_channel_handler);
let on_data_channel_handler = Arc::clone(&on_data_channel_handler);
Box::pin(async move {
if let Some(handler) = &*on_data_channel_handler2.load() {
if let Some(handler) = &*on_data_channel_handler.load() {
let mut f = handler.lock().await;
f(d).await;
}
})
}));

Ok((Arc::new(pc), configuration))
Ok((pc, configuration))
}

pub(super) async fn start_rtp(
Expand Down Expand Up @@ -1141,64 +1196,6 @@ impl PeerConnectionInternal {
}
}

pub(super) async fn create_ice_transport(&self, api: &API) -> Arc<RTCIceTransport> {
let ice_transport = Arc::new(api.new_ice_transport(Arc::clone(&self.ice_gatherer)));

let ice_connection_state = Arc::clone(&self.ice_connection_state);
let peer_connection_state = Arc::clone(&self.peer_connection_state);
let is_closed = Arc::clone(&self.is_closed);
let dtls_transport = Arc::clone(&self.dtls_transport);
let on_ice_connection_state_change_handler =
Arc::clone(&self.on_ice_connection_state_change_handler);
let on_peer_connection_state_change_handler =
Arc::clone(&self.on_peer_connection_state_change_handler);

ice_transport.on_connection_state_change(Box::new(move |state: RTCIceTransportState| {
let cs = match state {
RTCIceTransportState::New => RTCIceConnectionState::New,
RTCIceTransportState::Checking => RTCIceConnectionState::Checking,
RTCIceTransportState::Connected => RTCIceConnectionState::Connected,
RTCIceTransportState::Completed => RTCIceConnectionState::Completed,
RTCIceTransportState::Failed => RTCIceConnectionState::Failed,
RTCIceTransportState::Disconnected => RTCIceConnectionState::Disconnected,
RTCIceTransportState::Closed => RTCIceConnectionState::Closed,
_ => {
log::warn!("on_connection_state_change: unhandled ICE state: {}", state);
return Box::pin(async {});
}
};

let dtls_transport = Arc::clone(&dtls_transport);
let ice_connection_state2 = Arc::clone(&ice_connection_state);
let on_ice_connection_state_change_handler2 =
Arc::clone(&on_ice_connection_state_change_handler);
let on_peer_connection_state_change_handler2 =
Arc::clone(&on_peer_connection_state_change_handler);
let is_closed2 = Arc::clone(&is_closed);
let peer_connection_state2 = Arc::clone(&peer_connection_state);
Box::pin(async move {
RTCPeerConnection::do_ice_connection_state_change(
&on_ice_connection_state_change_handler2,
&ice_connection_state2,
cs,
)
.await;

let dtls_transport_state = dtls_transport.state();
RTCPeerConnection::update_connection_state(
&on_peer_connection_state_change_handler2,
&is_closed2,
&peer_connection_state2,
cs,
dtls_transport_state,
)
.await;
})
}));

ice_transport
}

/// has_local_description_changed returns whether local media (rtp_transceivers) has changed
/// caller of this method should hold `pc.mu` lock
pub(super) async fn has_local_description_changed(&self, desc: &RTCSessionDescription) -> bool {
Expand Down
Loading