@@ -375,9 +375,42 @@ where
375375 return ;
376376 } ;
377377
378+ // Scale the max_active window down by
379+ // [(F-1) / F]; where F=WINDOW_UPDATE_FRACTION.
380+ //
381+ // In the ideal case, each byte sent would trigger a flow control
382+ // update. However, in practice we only send updates every
383+ // WINDOW_UPDATE_FRACTION of the window. Thus, when not application
384+ // limited, in a steady state transfer it takes 1 RTT after sending 1 /
385+ // F bytes for the sender to receive the next update. The sender is
386+ // effectively limited to [(F-1) / F] bytes per RTT.
387+ //
388+ // By calculating with this effective window instead of the full
389+ // max_active, we account for the inherent delay between when the sender
390+ // would ideally receive flow control updates and when they actually
391+ // arrive due to our batched update strategy.
392+ //
393+ // Example with F=4 without adjustment:
394+ //
395+ // t=0 start sending
396+ // t=RTT/4 sent 1/4 of window total
397+ // t=RTT sent 1 window total
398+ // sender blocked for RTT/4
399+ // t=RTT+RTT/4 receive update for 1/4 of window
400+ //
401+ // Example with F=4 with adjustment:
402+ //
403+ // t=0 start sending
404+ // t=RTT/4 sent 1/4 of window total
405+ // t=RTT sent 1 window total
406+ // t=RTT+RTT/4 sent 1+1/4 window total; receive update for 1/4 of window (just in time)
407+ let effective_window =
408+ ( self . max_active * ( WINDOW_UPDATE_FRACTION - 1 ) ) / ( WINDOW_UPDATE_FRACTION ) ;
409+
378410 // Compute the amount of bytes we have received in excess
379411 // of what `max_active` might allow.
380- let window_bytes_expected = self . max_active * elapsed / rtt;
412+ let window_bytes_expected = ( effective_window * elapsed) / ( rtt) ;
413+
381414 let window_bytes_used = self . max_active - ( self . max_allowed - self . retired ) ;
382415 let Some ( excess) = window_bytes_used. checked_sub ( window_bytes_expected) else {
383416 // Used below expected. No auto-tuning needed.
@@ -1201,6 +1234,7 @@ mod test {
12011234 /// Allow auto-tuning algorithm to be off from actual bandwidth-delay
12021235 /// product by up to 1KiB.
12031236 const TOLERANCE : u64 = 1024 ;
1237+ const BW_TOLERANCE : f64 = 0.8 ;
12041238
12051239 test_fixture:: fixture_init ( ) ;
12061240
@@ -1211,6 +1245,7 @@ mod test {
12111245 u64:: from ( u16:: from_be_bytes ( random :: < 2 > ( ) ) % 1_000 + 1 ) * 1_000 * 1_000 ;
12121246 // Random delay between 1 ms and 256 ms.
12131247 let rtt = Duration :: from_millis ( u64:: from ( random :: < 1 > ( ) [ 0 ] ) + 1 ) ;
1248+ let half_rtt = rtt / 2 ;
12141249 let bdp = bandwidth * u64:: try_from ( rtt. as_millis ( ) ) . unwrap ( ) / 1_000 / 8 ;
12151250
12161251 let mut now = test_fixture:: now ( ) ;
@@ -1225,6 +1260,11 @@ mod test {
12251260 let mut fc =
12261261 ReceiverFlowControl :: new ( StreamId :: new ( 0 ) , INITIAL_LOCAL_MAX_STREAM_DATA as u64 ) ;
12271262
1263+ let mut bytes_received: u64 = 0 ;
1264+ let start_time = now;
1265+
1266+ // Track when sender can next send.
1267+ let mut next_send_time = now;
12281268 loop {
12291269 // Sender receives window updates.
12301270 if recv_to_send. front ( ) . is_some_and ( |( at, _) | * at <= now) {
@@ -1235,9 +1275,14 @@ mod test {
12351275 // Sender sends data frames.
12361276 let sender_progressed = if sender_window > 0 {
12371277 let to_send = min ( DATA_FRAME_SIZE , sender_window) ;
1238- send_to_recv. push_back ( ( now, to_send) ) ;
12391278 sender_window -= to_send;
1240- now += Duration :: from_secs_f64 ( to_send as f64 * 8.0 / bandwidth as f64 ) ;
1279+ let time_to_send =
1280+ Duration :: from_secs_f64 ( to_send as f64 * 8.0 / bandwidth as f64 ) ;
1281+
1282+ let send_start = next_send_time. max ( now) ;
1283+ next_send_time = send_start + time_to_send;
1284+
1285+ send_to_recv. push_back ( ( send_start + time_to_send + half_rtt, to_send) ) ;
12411286 true
12421287 } else {
12431288 false
@@ -1247,13 +1292,14 @@ mod test {
12471292 let mut receiver_progressed = false ;
12481293 if send_to_recv. front ( ) . is_some_and ( |( at, _) | * at <= now) {
12491294 let ( _, data) = send_to_recv. pop_front ( ) . unwrap ( ) ;
1295+ bytes_received += data;
12501296 let consumed = fc. set_consumed ( fc. retired ( ) + data) ?;
12511297 fc. add_retired ( consumed) ;
12521298
12531299 // Receiver sends window updates.
12541300 let prev_max_allowed = fc. max_allowed ;
12551301 if write_frames ( & mut fc, rtt, now) == 1 {
1256- recv_to_send. push_front ( ( now, fc. max_allowed - prev_max_allowed) ) ;
1302+ recv_to_send. push_back ( ( now + half_rtt , fc. max_allowed - prev_max_allowed) ) ;
12571303 receiver_progressed = true ;
12581304 if last_max_active < fc. max_active ( ) {
12591305 last_max_active = fc. max_active ( ) ;
@@ -1272,29 +1318,46 @@ mod test {
12721318 . expect ( "both are None" ) ;
12731319 }
12741320
1275- // Consider auto-tuning done once receive window hasn't changed for 4 RTT.
1276- if now. duration_since ( last_max_active_changed) > 4 * rtt {
1321+ // Consider auto-tuning done once receive window hasn't changed for 8 RTT.
1322+ // A large amount to allow the observed bandwidth average to stabilize.
1323+ if now. duration_since ( last_max_active_changed) > 8 * rtt {
12771324 break ;
12781325 }
12791326 }
12801327
1328+ // See comment in [`ReceiverFlowControl::auto_tune_inner`] for an
1329+ // explanation of the effective window.
1330+ let effective_window =
1331+ ( fc. max_active ( ) * ( WINDOW_UPDATE_FRACTION - 1 ) ) / WINDOW_UPDATE_FRACTION ;
1332+ let at_max_stream_data = fc. max_active ( ) == MAX_LOCAL_MAX_STREAM_DATA ;
1333+
1334+ let observed_bw =
1335+ ( 8 * bytes_received) as f64 / now. duration_since ( start_time) . as_secs_f64 ( ) ;
12811336 let summary = format ! (
1282- "Got receive window of {} MiB on connection with bandwidth {} MBit/s ({bandwidth} Bit/s), delay {rtt:?}, bdp {} MiB." ,
1283- fc. max_active( ) / 1024 / 1024 ,
1337+ "Got receive window of {} KiB (effectively {} KiB) on connection with observed bandwidth {} MBit/s. Expected: bandwidth {} MBit/s ({bandwidth} Bit/s), rtt {rtt:?}, bdp {} KiB." ,
1338+ fc. max_active( ) / 1024 ,
1339+ effective_window / 1024 ,
1340+ observed_bw / 1_000.0 / 1_000.0 ,
12841341 bandwidth / 1_000 / 1_000 ,
1285- bdp / 1024 / 1024 ,
1342+ bdp / 1024 ,
12861343 ) ;
12871344
12881345 assert ! (
1289- fc . max_active ( ) + TOLERANCE >= bdp || fc . max_active ( ) == MAX_LOCAL_MAX_STREAM_DATA ,
1346+ effective_window + TOLERANCE >= bdp || at_max_stream_data ,
12901347 "{summary} Receive window is smaller than the bdp."
12911348 ) ;
1349+
12921350 assert ! (
1293- fc . max_active - TOLERANCE <= bdp
1351+ effective_window - TOLERANCE <= bdp
12941352 || fc. max_active == INITIAL_LOCAL_MAX_STREAM_DATA as u64 ,
12951353 "{summary} Receive window is larger than the bdp."
12961354 ) ;
12971355
1356+ assert ! (
1357+ ( bandwidth as f64 ) * BW_TOLERANCE <= observed_bw || at_max_stream_data,
1358+ "{summary} Observed bandwidth is smaller than the link rate."
1359+ ) ;
1360+
12981361 qdebug ! ( "{summary}" ) ;
12991362 }
13001363
0 commit comments