Skip to content

Commit 7323190

Browse files
authored
Avoid spurious wakeups when stream capacity is not available (#661)
Fixes #628 Sometimes `poll_capacity` returns `Ready(Some(0))` - in which case caller will have no way to wait for the stream capacity to become available. The previous attempt on the fix has addressed only a part of the problem. The root cause - in a nutshell - is the race condition between the application tasks that performs stream I/O and the task that serves the underlying HTTP/2 connection. The application thread that is about to send data calls `reserve_capacity/poll_capacity`, is provided with some send capacity and proceeds to `send_data`. Meanwhile the service thread may send some buffered data and/or receive some window updates - either way the stream's effective allocated send capacity may not change, but, since the capacity still available, `send_capacity_inc` flag may be set. The sending task calls `send_data` and uses the entire allocated capacity, leaving the flag set. Next time `poll_capacity` returns `Ready(Some(0))`. This change sets the flag and dispatches the wakeup event only in cases when the effective capacity reported by `poll_capacity` actually increases.
1 parent 73bea23 commit 7323190

File tree

4 files changed

+111
-33
lines changed

4 files changed

+111
-33
lines changed

src/proto/streams/prioritize.rs

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -323,9 +323,11 @@ impl Prioritize {
323323
/// connection
324324
pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
325325
let available = stream.send_flow.available().as_size();
326-
stream.send_flow.claim_capacity(available);
327-
// Re-assign all capacity to the connection
328-
self.assign_connection_capacity(available, stream, counts);
326+
if available > 0 {
327+
stream.send_flow.claim_capacity(available);
328+
// Re-assign all capacity to the connection
329+
self.assign_connection_capacity(available, stream, counts);
330+
}
329331
}
330332

331333
/// Reclaim just reserved capacity, not buffered capacity, and re-assign
@@ -756,17 +758,7 @@ impl Prioritize {
756758

757759
// Update the flow control
758760
tracing::trace_span!("updating stream flow").in_scope(|| {
759-
stream.send_flow.send_data(len);
760-
761-
// Decrement the stream's buffered data counter
762-
debug_assert!(stream.buffered_send_data >= len as usize);
763-
stream.buffered_send_data -= len as usize;
764-
stream.requested_send_capacity -= len;
765-
766-
// If the capacity was limited because of the
767-
// max_send_buffer_size, then consider waking
768-
// the send task again...
769-
stream.notify_if_can_buffer_more(self.max_buffer_size);
761+
stream.send_data(len, self.max_buffer_size);
770762

771763
// Assign the capacity back to the connection that
772764
// was just consumed from the stream in the previous

src/proto/streams/send.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -333,12 +333,7 @@ impl Send {
333333

334334
/// Current available stream send capacity
335335
pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize {
336-
let available = stream.send_flow.available().as_size() as usize;
337-
let buffered = stream.buffered_send_data;
338-
339-
available
340-
.min(self.prioritize.max_buffer_size())
341-
.saturating_sub(buffered) as WindowSize
336+
stream.capacity(self.prioritize.max_buffer_size())
342337
}
343338

344339
pub fn poll_reset(

src/proto/streams/stream.rs

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -264,35 +264,65 @@ impl Stream {
264264
self.ref_count == 0 && !self.state.is_closed()
265265
}
266266

267+
/// Current available stream send capacity
268+
pub fn capacity(&self, max_buffer_size: usize) -> WindowSize {
269+
let available = self.send_flow.available().as_size() as usize;
270+
let buffered = self.buffered_send_data;
271+
272+
available.min(max_buffer_size).saturating_sub(buffered) as WindowSize
273+
}
274+
267275
pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) {
276+
let prev_capacity = self.capacity(max_buffer_size);
268277
debug_assert!(capacity > 0);
269278
self.send_flow.assign_capacity(capacity);
270279

271280
tracing::trace!(
272-
" assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={}",
281+
" assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
273282
self.send_flow.available(),
274283
self.buffered_send_data,
275284
self.id,
276-
max_buffer_size
285+
max_buffer_size,
286+
prev_capacity,
277287
);
278288

279-
self.notify_if_can_buffer_more(max_buffer_size);
289+
if prev_capacity < self.capacity(max_buffer_size) {
290+
self.notify_capacity();
291+
}
280292
}
281293

282-
/// If the capacity was limited because of the max_send_buffer_size,
283-
/// then consider waking the send task again...
284-
pub fn notify_if_can_buffer_more(&mut self, max_buffer_size: usize) {
285-
let available = self.send_flow.available().as_size() as usize;
286-
let buffered = self.buffered_send_data;
294+
pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) {
295+
let prev_capacity = self.capacity(max_buffer_size);
296+
297+
self.send_flow.send_data(len);
287298

288-
// Only notify if the capacity exceeds the amount of buffered data
289-
if available.min(max_buffer_size) > buffered {
290-
self.send_capacity_inc = true;
291-
tracing::trace!(" notifying task");
292-
self.notify_send();
299+
// Decrement the stream's buffered data counter
300+
debug_assert!(self.buffered_send_data >= len as usize);
301+
self.buffered_send_data -= len as usize;
302+
self.requested_send_capacity -= len;
303+
304+
tracing::trace!(
305+
" sent stream data; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
306+
self.send_flow.available(),
307+
self.buffered_send_data,
308+
self.id,
309+
max_buffer_size,
310+
prev_capacity,
311+
);
312+
313+
if prev_capacity < self.capacity(max_buffer_size) {
314+
self.notify_capacity();
293315
}
294316
}
295317

318+
/// If the capacity was limited because of the max_send_buffer_size,
319+
/// then consider waking the send task again...
320+
pub fn notify_capacity(&mut self) {
321+
self.send_capacity_inc = true;
322+
tracing::trace!(" notifying task");
323+
self.notify_send();
324+
}
325+
296326
/// Returns `Err` when the decrement cannot be completed due to overflow.
297327
pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> {
298328
match self.content_length {

tests/h2-tests/tests/flow_control.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1797,3 +1797,64 @@ async fn max_send_buffer_size_poll_capacity_wakes_task() {
17971797

17981798
join(srv, client).await;
17991799
}
1800+
1801+
#[tokio::test]
1802+
async fn poll_capacity_wakeup_after_window_update() {
1803+
h2_support::trace_init!();
1804+
let (io, mut srv) = mock::new();
1805+
1806+
let srv = async move {
1807+
let settings = srv
1808+
.assert_client_handshake_with_settings(frames::settings().initial_window_size(10))
1809+
.await;
1810+
assert_default_settings!(settings);
1811+
srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/"))
1812+
.await;
1813+
srv.send_frame(frames::headers(1).response(200)).await;
1814+
srv.recv_frame(frames::data(1, &b"abcde"[..])).await;
1815+
srv.send_frame(frames::window_update(1, 5)).await;
1816+
srv.send_frame(frames::window_update(1, 5)).await;
1817+
srv.recv_frame(frames::data(1, &b"abcde"[..])).await;
1818+
srv.recv_frame(frames::data(1, &b""[..]).eos()).await;
1819+
};
1820+
1821+
let h2 = async move {
1822+
let (mut client, mut h2) = client::Builder::new()
1823+
.max_send_buffer_size(5)
1824+
.handshake::<_, Bytes>(io)
1825+
.await
1826+
.unwrap();
1827+
let request = Request::builder()
1828+
.method(Method::POST)
1829+
.uri("https://www.example.com/")
1830+
.body(())
1831+
.unwrap();
1832+
1833+
let (response, mut stream) = client.send_request(request, false).unwrap();
1834+
1835+
let response = h2.drive(response).await.unwrap();
1836+
assert_eq!(response.status(), StatusCode::OK);
1837+
1838+
stream.send_data("abcde".into(), false).unwrap();
1839+
1840+
stream.reserve_capacity(10);
1841+
assert_eq!(stream.capacity(), 0);
1842+
1843+
let mut stream = h2.drive(util::wait_for_capacity(stream, 5)).await;
1844+
h2.drive(idle_ms(10)).await;
1845+
stream.send_data("abcde".into(), false).unwrap();
1846+
1847+
stream.reserve_capacity(5);
1848+
assert_eq!(stream.capacity(), 0);
1849+
1850+
// This will panic if there is a bug causing h2 to return Ok(0) from poll_capacity.
1851+
let mut stream = h2.drive(util::wait_for_capacity(stream, 5)).await;
1852+
1853+
stream.send_data("".into(), true).unwrap();
1854+
1855+
// Wait for the connection to close
1856+
h2.await.unwrap();
1857+
};
1858+
1859+
join(srv, h2).await;
1860+
}

0 commit comments

Comments
 (0)