Skip to content

Commit 318ad8a

Browse files
authored
Merge pull request #406 from smoltcp-rs/rto
tcp: Add RTT estimation.
2 parents 2aede17 + 5117af7 commit 318ad8a

File tree

2 files changed

+152
-32
lines changed

2 files changed

+152
-32
lines changed

src/socket/tcp.rs

Lines changed: 144 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,99 @@ impl fmt::Display for State {
5454
}
5555
}
5656

57+
// Conservative initial RTT estimate.
58+
const RTTE_INITIAL_RTT: u32 = 300;
59+
const RTTE_INITIAL_DEV: u32 = 100;
60+
61+
// Minimum "safety margin" for the RTO that kicks in when the
62+
// variance gets very low.
63+
const RTTE_MIN_MARGIN: u32 = 5;
64+
65+
const RTTE_MIN_RTO: u32 = 10;
66+
const RTTE_MAX_RTO: u32 = 10000;
67+
68+
#[derive(Debug, Clone, Copy)]
69+
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
70+
struct RttEstimator {
71+
// Using u32 instead of Duration to save space (Duration is i64)
72+
rtt: u32,
73+
deviation: u32,
74+
timestamp: Option<(Instant, TcpSeqNumber)>,
75+
max_seq_sent: Option<TcpSeqNumber>,
76+
rto_count: u8,
77+
}
78+
79+
impl Default for RttEstimator {
80+
fn default() -> Self {
81+
Self {
82+
rtt: RTTE_INITIAL_RTT,
83+
deviation: RTTE_INITIAL_DEV,
84+
timestamp: None,
85+
max_seq_sent: None,
86+
rto_count: 0,
87+
}
88+
}
89+
}
90+
91+
impl RttEstimator {
92+
fn retransmission_timeout(&self) -> Duration {
93+
let margin = RTTE_MIN_MARGIN.max(self.deviation * 4);
94+
let ms = (self.rtt + margin).max(RTTE_MIN_RTO).min(RTTE_MAX_RTO);
95+
Duration::from_millis(ms as u64)
96+
}
97+
98+
fn sample(&mut self, new_rtt: u32) {
99+
// "Congestion Avoidance and Control", Van Jacobson, Michael J. Karels, 1988
100+
self.rtt = (self.rtt * 7 + new_rtt + 7) / 8;
101+
let diff = (self.rtt as i32 - new_rtt as i32 ).abs() as u32;
102+
self.deviation = (self.deviation * 3 + diff + 3) / 4;
103+
104+
self.rto_count = 0;
105+
106+
let rto = self.retransmission_timeout().millis();
107+
net_trace!("rtte: sample={:?} rtt={:?} dev={:?} rto={:?}", new_rtt, self.rtt, self.deviation, rto);
108+
}
109+
110+
fn on_send(&mut self, timestamp: Instant, seq: TcpSeqNumber) {
111+
if self.max_seq_sent.map(|max_seq_sent| seq > max_seq_sent).unwrap_or(true) {
112+
self.max_seq_sent = Some(seq);
113+
if self.timestamp.is_none() {
114+
self.timestamp = Some((timestamp, seq));
115+
net_trace!("rtte: sampling at seq={:?}", seq);
116+
}
117+
}
118+
}
119+
120+
fn on_ack(&mut self, timestamp: Instant, seq: TcpSeqNumber) {
121+
if let Some((sent_timestamp, sent_seq)) = self.timestamp {
122+
if seq >= sent_seq {
123+
self.sample((timestamp - sent_timestamp).millis() as u32);
124+
self.timestamp = None;
125+
}
126+
}
127+
}
128+
129+
fn on_retransmit(&mut self) {
130+
if self.timestamp.is_some() {
131+
net_trace!("rtte: abort sampling due to retransmit");
132+
}
133+
self.timestamp = None;
134+
self.rto_count = self.rto_count.saturating_add(1);
135+
if self.rto_count >= 3 {
136+
// This happens in 2 scenarios:
137+
// - The RTT is higher than the initial estimate
138+
// - The network conditions change, suddenly making the RTT much higher
139+
// In these cases, the estimator can get stuck, because it can't sample because
140+
// all packets sent would incur a retransmit. To avoid this, force an estimate
141+
// increase if we see 3 consecutive retransmissions without any successful sample.
142+
self.rto_count = 0;
143+
self.rtt *= 2;
144+
let rto = self.retransmission_timeout().millis();
145+
net_trace!("rtte: too many retransmissions, increasing: rtt={:?} dev={:?} rto={:?}", self.rtt, self.deviation, rto);
146+
}
147+
}
148+
}
149+
57150
#[derive(Debug, Clone, Copy, PartialEq)]
58151
enum Timer {
59152
Idle {
@@ -69,7 +162,6 @@ enum Timer {
69162
}
70163
}
71164

72-
const RETRANSMIT_DELAY: Duration = Duration { millis: 100 };
73165
const CLOSE_DELAY: Duration = Duration { millis: 10_000 };
74166

75167
impl Default for Timer {
@@ -140,12 +232,12 @@ impl Timer {
140232
}
141233
}
142234

143-
fn set_for_retransmit(&mut self, timestamp: Instant) {
235+
fn set_for_retransmit(&mut self, timestamp: Instant, delay: Duration) {
144236
match *self {
145237
Timer::Idle { .. } | Timer::FastRetransmit { .. } => {
146238
*self = Timer::Retransmit {
147-
expires_at: timestamp + RETRANSMIT_DELAY,
148-
delay: RETRANSMIT_DELAY,
239+
expires_at: timestamp + delay,
240+
delay: delay,
149241
}
150242
}
151243
Timer::Retransmit { expires_at, delay }
@@ -189,6 +281,7 @@ pub struct TcpSocket<'a> {
189281
pub(crate) meta: SocketMeta,
190282
state: State,
191283
timer: Timer,
284+
rtte: RttEstimator,
192285
assembler: Assembler,
193286
rx_buffer: SocketBuffer<'a>,
194287
rx_fin_received: bool,
@@ -279,6 +372,7 @@ impl<'a> TcpSocket<'a> {
279372
meta: SocketMeta::default(),
280373
state: State::Closed,
281374
timer: Timer::default(),
375+
rtte: RttEstimator::default(),
282376
assembler: Assembler::new(rx_buffer.capacity()),
283377
tx_buffer: tx_buffer,
284378
rx_buffer: rx_buffer,
@@ -463,6 +557,7 @@ impl<'a> TcpSocket<'a> {
463557

464558
self.state = State::Closed;
465559
self.timer = Timer::default();
560+
self.rtte = RttEstimator::default();
466561
self.assembler = Assembler::new(self.rx_buffer.capacity());
467562
self.tx_buffer.clear();
468563
self.rx_buffer.clear();
@@ -1154,6 +1249,8 @@ impl<'a> TcpSocket<'a> {
11541249
self.meta.handle, self.local_endpoint, self.remote_endpoint);
11551250
ack_of_fin = true;
11561251
}
1252+
1253+
self.rtte.on_ack(timestamp, ack_number);
11571254
}
11581255
}
11591256

@@ -1538,6 +1635,7 @@ impl<'a> TcpSocket<'a> {
15381635
self.meta.handle, self.local_endpoint, self.remote_endpoint,
15391636
retransmit_delta);
15401637
self.remote_last_seq = self.local_seq_no;
1638+
self.rtte.on_retransmit();
15411639
}
15421640
}
15431641

@@ -1723,10 +1821,14 @@ impl<'a> TcpSocket<'a> {
17231821
self.remote_last_ack = repr.ack_number;
17241822
self.remote_last_win = repr.window_len;
17251823

1824+
if repr.segment_len() > 0 {
1825+
self.rtte.on_send(timestamp, repr.seq_number + repr.segment_len());
1826+
}
1827+
17261828
if !self.seq_to_transmit() && repr.segment_len() > 0 {
17271829
// If we've transmitted all data we could (and there was something at all,
17281830
// data or flag, to transmit, not just an ACK), wind up the retransmit timer.
1729-
self.timer.set_for_retransmit(timestamp);
1831+
self.timer.set_for_retransmit(timestamp, self.rtte.retransmission_timeout());
17301832
}
17311833

17321834
if self.state == State::Closed {
@@ -3646,7 +3748,7 @@ mod test {
36463748
..RECV_TEMPL
36473749
}));
36483750
recv!(s, time 1050, Err(Error::Exhausted));
3649-
recv!(s, time 1100, Ok(TcpRepr {
3751+
recv!(s, time 2000, Ok(TcpRepr {
36503752
seq_number: LOCAL_SEQ + 1,
36513753
ack_number: Some(REMOTE_SEQ + 1),
36523754
payload: &b"abcdef"[..],
@@ -3678,21 +3780,21 @@ mod test {
36783780

36793781
recv!(s, time 50, Err(Error::Exhausted));
36803782

3681-
recv!(s, time 100, Ok(TcpRepr {
3783+
recv!(s, time 1000, Ok(TcpRepr {
36823784
control: TcpControl::None,
36833785
seq_number: LOCAL_SEQ + 1,
36843786
ack_number: Some(REMOTE_SEQ + 1),
36853787
payload: &b"abcdef"[..],
36863788
..RECV_TEMPL
36873789
}), exact);
3688-
recv!(s, time 150, Ok(TcpRepr {
3790+
recv!(s, time 1500, Ok(TcpRepr {
36893791
control: TcpControl::Psh,
36903792
seq_number: LOCAL_SEQ + 1 + 6,
36913793
ack_number: Some(REMOTE_SEQ + 1),
36923794
payload: &b"012345"[..],
36933795
..RECV_TEMPL
36943796
}), exact);
3695-
recv!(s, time 200, Err(Error::Exhausted));
3797+
recv!(s, time 1550, Err(Error::Exhausted));
36963798
}
36973799

36983800
#[test]
@@ -3705,7 +3807,7 @@ mod test {
37053807
max_seg_size: Some(BASE_MSS),
37063808
..RECV_TEMPL
37073809
}));
3708-
recv!(s, time 150, Ok(TcpRepr { // retransmit
3810+
recv!(s, time 750, Ok(TcpRepr { // retransmit
37093811
control: TcpControl::Syn,
37103812
seq_number: LOCAL_SEQ,
37113813
ack_number: Some(REMOTE_SEQ + 1),
@@ -4527,9 +4629,9 @@ mod test {
45274629
#[test]
45284630
fn test_established_timeout() {
45294631
let mut s = socket_established();
4530-
s.set_timeout(Some(Duration::from_millis(200)));
4632+
s.set_timeout(Some(Duration::from_millis(1000)));
45314633
recv!(s, time 250, Err(Error::Exhausted));
4532-
assert_eq!(s.poll_at(), PollAt::Time(Instant::from_millis(450)));
4634+
assert_eq!(s.poll_at(), PollAt::Time(Instant::from_millis(1250)));
45334635
s.send_slice(b"abcdef").unwrap();
45344636
assert_eq!(s.poll_at(), PollAt::Now);
45354637
recv!(s, time 255, Ok(TcpRepr {
@@ -4538,15 +4640,15 @@ mod test {
45384640
payload: &b"abcdef"[..],
45394641
..RECV_TEMPL
45404642
}));
4541-
assert_eq!(s.poll_at(), PollAt::Time(Instant::from_millis(355)));
4542-
recv!(s, time 355, Ok(TcpRepr {
4643+
assert_eq!(s.poll_at(), PollAt::Time(Instant::from_millis(955)));
4644+
recv!(s, time 955, Ok(TcpRepr {
45434645
seq_number: LOCAL_SEQ + 1,
45444646
ack_number: Some(REMOTE_SEQ + 1),
45454647
payload: &b"abcdef"[..],
45464648
..RECV_TEMPL
45474649
}));
4548-
assert_eq!(s.poll_at(), PollAt::Time(Instant::from_millis(455)));
4549-
recv!(s, time 500, Ok(TcpRepr {
4650+
assert_eq!(s.poll_at(), PollAt::Time(Instant::from_millis(1255)));
4651+
recv!(s, time 1255, Ok(TcpRepr {
45504652
control: TcpControl::Rst,
45514653
seq_number: LOCAL_SEQ + 1 + 6,
45524654
ack_number: Some(REMOTE_SEQ + 1),
@@ -4596,15 +4698,14 @@ mod test {
45964698
#[test]
45974699
fn test_fin_wait_1_timeout() {
45984700
let mut s = socket_fin_wait_1();
4599-
s.set_timeout(Some(Duration::from_millis(200)));
4701+
s.set_timeout(Some(Duration::from_millis(1000)));
46004702
recv!(s, time 100, Ok(TcpRepr {
46014703
control: TcpControl::Fin,
46024704
seq_number: LOCAL_SEQ + 1,
46034705
ack_number: Some(REMOTE_SEQ + 1),
46044706
..RECV_TEMPL
46054707
}));
4606-
assert_eq!(s.poll_at(), PollAt::Time(Instant::from_millis(200)));
4607-
recv!(s, time 400, Ok(TcpRepr {
4708+
recv!(s, time 1100, Ok(TcpRepr {
46084709
control: TcpControl::Rst,
46094710
seq_number: LOCAL_SEQ + 1 + 1,
46104711
ack_number: Some(REMOTE_SEQ + 1),
@@ -4616,15 +4717,14 @@ mod test {
46164717
#[test]
46174718
fn test_last_ack_timeout() {
46184719
let mut s = socket_last_ack();
4619-
s.set_timeout(Some(Duration::from_millis(200)));
4720+
s.set_timeout(Some(Duration::from_millis(1000)));
46204721
recv!(s, time 100, Ok(TcpRepr {
46214722
control: TcpControl::Fin,
46224723
seq_number: LOCAL_SEQ + 1,
46234724
ack_number: Some(REMOTE_SEQ + 1 + 1),
46244725
..RECV_TEMPL
46254726
}));
4626-
assert_eq!(s.poll_at(), PollAt::Time(Instant::from_millis(200)));
4627-
recv!(s, time 400, Ok(TcpRepr {
4727+
recv!(s, time 1100, Ok(TcpRepr {
46284728
control: TcpControl::Rst,
46294729
seq_number: LOCAL_SEQ + 1 + 1,
46304730
ack_number: Some(REMOTE_SEQ + 1 + 1),
@@ -5052,13 +5152,14 @@ mod test {
50525152

50535153
#[test]
50545154
fn test_timer_retransmit() {
5155+
const RTO: Duration = Duration::from_millis(100);
50555156
let mut r = Timer::default();
50565157
assert_eq!(r.should_retransmit(Instant::from_secs(1)), None);
5057-
r.set_for_retransmit(Instant::from_millis(1000));
5158+
r.set_for_retransmit(Instant::from_millis(1000), RTO);
50585159
assert_eq!(r.should_retransmit(Instant::from_millis(1000)), None);
50595160
assert_eq!(r.should_retransmit(Instant::from_millis(1050)), None);
50605161
assert_eq!(r.should_retransmit(Instant::from_millis(1101)), Some(Duration::from_millis(101)));
5061-
r.set_for_retransmit(Instant::from_millis(1101));
5162+
r.set_for_retransmit(Instant::from_millis(1101), RTO);
50625163
assert_eq!(r.should_retransmit(Instant::from_millis(1101)), None);
50635164
assert_eq!(r.should_retransmit(Instant::from_millis(1150)), None);
50645165
assert_eq!(r.should_retransmit(Instant::from_millis(1200)), None);
@@ -5067,4 +5168,23 @@ mod test {
50675168
assert_eq!(r.should_retransmit(Instant::from_millis(1350)), None);
50685169
}
50695170

5171+
#[test]
5172+
fn test_rtt_estimator() {
5173+
#[cfg(feature = "log")]
5174+
init_logger();
5175+
5176+
let mut r = RttEstimator::default();
5177+
5178+
let rtos = &[
5179+
751, 766, 755, 731, 697, 656, 613, 567,
5180+
523, 484, 445, 411, 378, 350, 322, 299,
5181+
280, 261, 243, 229, 215, 206, 197, 188
5182+
];
5183+
5184+
for &rto in rtos {
5185+
r.sample(100);
5186+
assert_eq!(r.retransmission_timeout(), Duration::from_millis(rto));
5187+
}
5188+
}
5189+
50705190
}

src/time.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,19 +50,19 @@ impl Instant {
5050

5151
/// The fractional number of milliseconds that have passed
5252
/// since the beginning of time.
53-
pub fn millis(&self) -> i64 {
53+
pub const fn millis(&self) -> i64 {
5454
self.millis % 1000
5555
}
5656

5757
/// The number of whole seconds that have passed since the
5858
/// beginning of time.
59-
pub fn secs(&self) -> i64 {
59+
pub const fn secs(&self) -> i64 {
6060
self.millis / 1000
6161
}
6262

6363
/// The total number of milliseconds that have passed since
6464
/// the biginning of time.
65-
pub fn total_millis(&self) -> i64 {
65+
pub const fn total_millis(&self) -> i64 {
6666
self.millis
6767
}
6868
}
@@ -141,27 +141,27 @@ pub struct Duration {
141141

142142
impl Duration {
143143
/// Create a new `Duration` from a number of milliseconds.
144-
pub fn from_millis(millis: u64) -> Duration {
144+
pub const fn from_millis(millis: u64) -> Duration {
145145
Duration { millis }
146146
}
147147

148148
/// Create a new `Instant` from a number of seconds.
149-
pub fn from_secs(secs: u64) -> Duration {
149+
pub const fn from_secs(secs: u64) -> Duration {
150150
Duration { millis: secs * 1000 }
151151
}
152152

153153
/// The fractional number of milliseconds in this `Duration`.
154-
pub fn millis(&self) -> u64 {
154+
pub const fn millis(&self) -> u64 {
155155
self.millis % 1000
156156
}
157157

158158
/// The number of whole seconds in this `Duration`.
159-
pub fn secs(&self) -> u64 {
159+
pub const fn secs(&self) -> u64 {
160160
self.millis / 1000
161161
}
162162

163163
/// The total number of milliseconds in this `Duration`.
164-
pub fn total_millis(&self) -> u64 {
164+
pub const fn total_millis(&self) -> u64 {
165165
self.millis
166166
}
167167
}

0 commit comments

Comments
 (0)