Skip to content

Commit

Permalink
webrtc: Add support for unordered unreliable data channels (#609)
Browse files Browse the repository at this point in the history
  • Loading branch information
PaulOlteanu authored Sep 7, 2024
1 parent 76f780a commit 8d95518
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 44 deletions.
4 changes: 2 additions & 2 deletions webrtc/src/data_channel/data_channel_parameters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub struct DataChannelParameters {
pub label: String,
pub protocol: String,
pub ordered: bool,
pub max_packet_life_time: u16,
pub max_retransmits: u16,
pub max_packet_life_time: Option<u16>,
pub max_retransmits: Option<u16>,
pub negotiated: Option<u16>,
}
113 changes: 107 additions & 6 deletions webrtc/src/data_channel/data_channel_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ async fn test_data_channel_parameters_max_packet_life_time_exchange() -> Result<
);
assert_eq!(
dc.max_packet_lifetime(),
max_packet_life_time,
Some(max_packet_life_time),
"should match"
);

Expand All @@ -394,7 +394,7 @@ async fn test_data_channel_parameters_max_packet_life_time_exchange() -> Result<
);
assert_eq!(
d.max_packet_lifetime(),
max_packet_life_time,
Some(max_packet_life_time),
"should match"
);
let done_tx2 = Arc::clone(&done_tx);
Expand Down Expand Up @@ -428,7 +428,7 @@ async fn test_data_channel_parameters_max_retransmits_exchange() -> Result<()> {

// Check if parameters are correctly set
assert!(!dc.ordered(), "Ordered should be set to false");
assert_eq!(dc.max_retransmits(), max_retransmits, "should match");
assert_eq!(dc.max_retransmits(), Some(max_retransmits), "should match");

let done_tx = Arc::new(Mutex::new(Some(done_tx)));
answer_pc.on_data_channel(Box::new(move |d: Arc<RTCDataChannel>| {
Expand All @@ -440,7 +440,58 @@ async fn test_data_channel_parameters_max_retransmits_exchange() -> Result<()> {

// Check if parameters are correctly set
assert!(!d.ordered(), "Ordered should be set to false");
assert_eq!(max_retransmits, d.max_retransmits(), "should match");
assert_eq!(Some(max_retransmits), d.max_retransmits(), "should match");
let done_tx2 = Arc::clone(&done_tx);
Box::pin(async move {
let mut done = done_tx2.lock().await;
done.take();
})
}));

close_reliability_param_test(&mut offer_pc, &mut answer_pc, done_rx).await?;

Ok(())
}

#[tokio::test]
async fn test_data_channel_parameters_unreliable_unordered_exchange() -> Result<()> {
let mut m = MediaEngine::default();
m.register_default_codecs()?;
let api = APIBuilder::new().with_media_engine(m).build();

let ordered = false;
let max_retransmits = Some(0);
let max_packet_life_time = None;
let options = RTCDataChannelInit {
ordered: Some(ordered),
max_retransmits,
max_packet_life_time,
..Default::default()
};

let (mut offer_pc, mut answer_pc, dc, done_tx, done_rx) =
set_up_data_channel_parameters_test(&api, Some(options)).await?;

// Check if parameters are correctly set
assert_eq!(
dc.ordered(),
ordered,
"Ordered should be same value as set in DataChannelInit"
);
assert_eq!(dc.max_retransmits, max_retransmits, "should match");

let done_tx = Arc::new(Mutex::new(Some(done_tx)));
answer_pc.on_data_channel(Box::new(move |d: Arc<RTCDataChannel>| {
if d.label() != EXPECTED_LABEL {
return Box::pin(async {});
}
// Check if parameters are correctly set
assert_eq!(
d.ordered(),
ordered,
"Ordered should be same value as set in DataChannelInit"
);
assert_eq!(d.max_retransmits(), max_retransmits, "should match");
let done_tx2 = Arc::clone(&done_tx);
Box::pin(async move {
let mut done = done_tx2.lock().await;
Expand All @@ -453,6 +504,56 @@ async fn test_data_channel_parameters_max_retransmits_exchange() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_data_channel_parameters_reliable_unordered_exchange() -> Result<()> {
let mut m = MediaEngine::default();
m.register_default_codecs()?;
let api = APIBuilder::new().with_media_engine(m).build();

let ordered = false;
let max_retransmits = None;
let max_packet_life_time = None;
let options = RTCDataChannelInit {
ordered: Some(ordered),
max_retransmits,
max_packet_life_time,
..Default::default()
};

let (mut offer_pc, mut answer_pc, dc, done_tx, done_rx) =
set_up_data_channel_parameters_test(&api, Some(options)).await?;

// Check if parameters are correctly set
assert_eq!(
dc.ordered(),
ordered,
"Ordered should be same value as set in DataChannelInit"
);
assert_eq!(dc.max_retransmits, max_retransmits, "should match");

let done_tx = Arc::new(Mutex::new(Some(done_tx)));
answer_pc.on_data_channel(Box::new(move |d: Arc<RTCDataChannel>| {
if d.label() != EXPECTED_LABEL {
return Box::pin(async {});
}
// Check if parameters are correctly set
assert_eq!(
d.ordered(),
ordered,
"Ordered should be same value as set in DataChannelInit"
);
assert_eq!(d.max_retransmits(), max_retransmits, "should match");
let done_tx2 = Arc::clone(&done_tx);
Box::pin(async move {
let mut done = done_tx2.lock().await;
done.take();
})
}));

close_reliability_param_test(&mut offer_pc, &mut answer_pc, done_rx).await?;

Ok(())
}
#[tokio::test]
async fn test_data_channel_parameters_protocol_exchange() -> Result<()> {
let mut m = MediaEngine::default();
Expand Down Expand Up @@ -743,7 +844,7 @@ async fn test_data_channel_parameters_go() -> Result<()> {
// Check if parameters are correctly set
assert!(dc.ordered(), "Ordered should be set to true");
assert_eq!(
max_packet_life_time,
Some(max_packet_life_time),
dc.max_packet_lifetime(),
"should match"
);
Expand All @@ -759,7 +860,7 @@ async fn test_data_channel_parameters_go() -> Result<()> {
// Check if parameters are correctly set
assert!(d.ordered, "Ordered should be set to true");
assert_eq!(
max_packet_life_time,
Some(max_packet_life_time),
d.max_packet_lifetime(),
"should match"
);
Expand Down
50 changes: 28 additions & 22 deletions webrtc/src/data_channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ pub struct RTCDataChannel {
pub(crate) stats_id: String,
pub(crate) label: String,
pub(crate) ordered: bool,
pub(crate) max_packet_lifetime: u16,
pub(crate) max_retransmits: u16,
pub(crate) max_packet_lifetime: Option<u16>,
pub(crate) max_retransmits: Option<u16>,
pub(crate) protocol: String,
pub(crate) negotiated: bool,
pub(crate) id: AtomicU16,
Expand Down Expand Up @@ -137,26 +137,32 @@ impl RTCDataChannel {
let channel_type;
let reliability_parameter;

if self.max_packet_lifetime == 0 && self.max_retransmits == 0 {
reliability_parameter = 0u32;
if self.ordered {
channel_type = ChannelType::Reliable;
} else {
channel_type = ChannelType::ReliableUnordered;
match (self.max_retransmits, self.max_packet_lifetime) {
(None, None) => {
reliability_parameter = 0u32;
if self.ordered {
channel_type = ChannelType::Reliable;
} else {
channel_type = ChannelType::ReliableUnordered;
}
}
} else if self.max_retransmits != 0 {
reliability_parameter = self.max_retransmits as u32;
if self.ordered {
channel_type = ChannelType::PartialReliableRexmit;
} else {
channel_type = ChannelType::PartialReliableRexmitUnordered;

(Some(max_retransmits), _) => {
reliability_parameter = max_retransmits as u32;
if self.ordered {
channel_type = ChannelType::PartialReliableRexmit;
} else {
channel_type = ChannelType::PartialReliableRexmitUnordered;
}
}
} else {
reliability_parameter = self.max_packet_lifetime as u32;
if self.ordered {
channel_type = ChannelType::PartialReliableTimed;
} else {
channel_type = ChannelType::PartialReliableTimedUnordered;

(None, Some(max_packet_lifetime)) => {
reliability_parameter = max_packet_lifetime as u32;
if self.ordered {
channel_type = ChannelType::PartialReliableTimed;
} else {
channel_type = ChannelType::PartialReliableTimedUnordered;
}
}
}

Expand Down Expand Up @@ -454,13 +460,13 @@ impl RTCDataChannel {

/// max_packet_lifetime represents the length of the time window (msec) during
/// which transmissions and retransmissions may occur in unreliable mode.
pub fn max_packet_lifetime(&self) -> u16 {
pub fn max_packet_lifetime(&self) -> Option<u16> {
self.max_packet_lifetime
}

/// max_retransmits represents the maximum number of retransmissions that are
/// attempted in unreliable mode.
pub fn max_retransmits(&self) -> u16 {
pub fn max_retransmits(&self) -> Option<u16> {
self.max_retransmits
}

Expand Down
10 changes: 3 additions & 7 deletions webrtc/src/peer_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1849,14 +1849,10 @@ impl RTCPeerConnection {
}

// https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #7)
if let Some(max_packet_life_time) = options.max_packet_life_time {
params.max_packet_life_time = max_packet_life_time;
}
params.max_packet_life_time = options.max_packet_life_time;

// https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #8)
if let Some(max_retransmits) = options.max_retransmits {
params.max_retransmits = max_retransmits;
}
params.max_retransmits = options.max_retransmits;

// https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #10)
if let Some(protocol) = options.protocol {
Expand All @@ -1878,7 +1874,7 @@ impl RTCPeerConnection {
));

// https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #16)
if d.max_packet_lifetime != 0 && d.max_retransmits != 0 {
if d.max_packet_lifetime.is_some() && d.max_retransmits.is_some() {
return Err(Error::ErrRetransmitsOrPacketLifeTime);
}

Expand Down
14 changes: 7 additions & 7 deletions webrtc/src/sctp_transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,8 @@ impl RTCSctpTransport {
}
};

let mut max_retransmits = 0;
let mut max_packet_lifetime = 0;
let mut max_retransmits = None;
let mut max_packet_life_time = None;
let val = dc.config.reliability_parameter as u16;
let ordered;

Expand All @@ -258,19 +258,19 @@ impl RTCSctpTransport {
}
ChannelType::PartialReliableRexmit => {
ordered = true;
max_retransmits = val;
max_retransmits = Some(val);
}
ChannelType::PartialReliableRexmitUnordered => {
ordered = false;
max_retransmits = val;
max_retransmits = Some(val);
}
ChannelType::PartialReliableTimed => {
ordered = true;
max_packet_lifetime = val;
max_packet_life_time = Some(val);
}
ChannelType::PartialReliableTimedUnordered => {
ordered = false;
max_packet_lifetime = val;
max_packet_life_time = Some(val);
}
};

Expand All @@ -285,7 +285,7 @@ impl RTCSctpTransport {
protocol: dc.config.protocol.clone(),
negotiated,
ordered,
max_packet_life_time: max_packet_lifetime,
max_packet_life_time,
max_retransmits,
},
Arc::clone(&param.setting_engine),
Expand Down

0 comments on commit 8d95518

Please sign in to comment.