Skip to content

Commit

Permalink
fix(client): don't reuse a connection while still flushing
Browse files Browse the repository at this point in the history
A client connection that read a full response while the request body was
still flushing would see incorrect behavior, since the pool would let it
be checked out again for a new request. In debug builds, it would then
panic, but in release builds it would intermix the 2nd request bytes
with the body of the previous request.

In practice, this only ever happens if a server replies with a full
response before reading the full request, while also choosing to not
close that connection. Most servers either wait for the full request, or
close the connection after the new response is written, so as to stop
reading.
  • Loading branch information
seanmonstar committed Sep 15, 2021
1 parent e3ab409 commit c88011d
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 3 deletions.
7 changes: 6 additions & 1 deletion src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ where
self.io.set_flush_pipeline(enabled);
}

#[cfg(test)]
pub(crate) fn set_write_strategy_queue(&mut self) {
self.io.set_write_strategy_queue();
}

pub(crate) fn set_max_buf_size(&mut self, max: usize) {
self.io.set_max_buf_size(max);
}
Expand Down Expand Up @@ -461,7 +466,7 @@ where
}
}
match self.state.writing {
Writing::Init => true,
Writing::Init => self.io.can_headers_buf(),
_ => false,
}
}
Expand Down
29 changes: 28 additions & 1 deletion src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,6 @@ mod tests {

// Block at 0 for now, but we will release this response before
// the request is ready to write later...
//let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), 0);
let (mut tx, rx) = crate::client::dispatch::channel();
let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
let mut dispatcher = Dispatcher::new(Client::new(rx), conn);
Expand All @@ -692,6 +691,34 @@ mod tests {
});
}

#[tokio::test]
async fn client_flushing_is_not_ready_for_next_request() {
let _ = pretty_env_logger::try_init();

let (io, _handle) = tokio_test::io::Builder::new()
.write(b"POST / HTTP/1.1\r\ncontent-length: 4\r\n\r\n")
.read(b"HTTP/1.1 200 OK\r\ncontent-length: 0\r\n\r\n")
.wait(std::time::Duration::from_secs(2))
.build_with_handle();

let (mut tx, rx) = crate::client::dispatch::channel();
let mut conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
conn.set_write_strategy_queue();

let dispatcher = Dispatcher::new(Client::new(rx), conn);
let _dispatcher = tokio::spawn(async move { dispatcher.await });

let req = crate::Request::builder()
.method("POST")
.body(crate::Body::from("reee"))
.unwrap();

let res = tx.try_send(req).unwrap().await.expect("response");
drop(res);

assert!(!tx.is_ready());
}

#[tokio::test]
async fn body_empty_chunks_ignored() {
let _ = pretty_env_logger::try_init();
Expand Down
16 changes: 15 additions & 1 deletion src/proto/h1/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,18 @@ where
}

#[cfg(feature = "server")]
pub(crate) fn set_write_strategy_flatten(&mut self) {
fn set_write_strategy_flatten(&mut self) {
// this should always be called only at construction time,
// so this assert is here to catch myself
debug_assert!(self.write_buf.queue.bufs_cnt() == 0);
self.write_buf.set_strategy(WriteStrategy::Flatten);
}

#[cfg(test)]
pub(crate) fn set_write_strategy_queue(&mut self) {
self.write_buf.set_strategy(WriteStrategy::Queue);
}

pub(crate) fn read_buf(&self) -> &[u8] {
self.read_buf.as_ref()
}
Expand All @@ -121,6 +126,15 @@ where
self.read_buf.capacity() - self.read_buf.len()
}

/// Return whether we can append to the headers buffer.
///
/// Reasons we can't:
/// - The write buf is in queue mode, and some of the past body is still
/// needing to be flushed.
pub(crate) fn can_headers_buf(&self) -> bool {
!self.write_buf.queue.has_remaining()
}

pub(crate) fn headers_buf(&mut self) -> &mut Vec<u8> {
let buf = self.write_buf.headers_mut();
&mut buf.bytes
Expand Down

0 comments on commit c88011d

Please sign in to comment.