Skip to content

Commit e455fa2

Browse files
committed
fix(client): fix connection leak when Response finishes before Request body
If the Response was received and the body finished while the Request body was still streaming, the connection could get into a state where it was never polled again, thus never re-inserting into the connection pool or being dropped. Closes #1717
1 parent 2e7250b commit e455fa2

File tree

3 files changed

+91
-3
lines changed

3 files changed

+91
-3
lines changed

src/proto/h1/conn.rs

+13-2
Original file line numberDiff line numberDiff line change
@@ -794,7 +794,7 @@ impl State {
794794
match (&self.reading, &self.writing) {
795795
(&Reading::KeepAlive, &Writing::KeepAlive) => {
796796
if let KA::Busy = self.keep_alive.status() {
797-
self.idle();
797+
self.idle::<T>();
798798
} else {
799799
trace!("try_keep_alive({}): could keep-alive, but status = {:?}", T::LOG, self.keep_alive);
800800
self.close();
@@ -819,12 +819,23 @@ impl State {
819819
self.keep_alive.busy();
820820
}
821821

822-
fn idle(&mut self) {
822+
fn idle<T: Http1Transaction>(&mut self) {
823+
debug_assert!(!self.is_idle(), "State::idle() called while idle");
824+
823825
self.method = None;
824826
self.keep_alive.idle();
825827
if self.is_idle() {
826828
self.reading = Reading::Init;
827829
self.writing = Writing::Init;
830+
831+
// !T::should_read_first() means Client.
832+
//
833+
// If Client connection has just gone idle, the Dispatcher
834+
// should try the poll loop one more time, so as to poll the
835+
// pending requests stream.
836+
if !T::should_read_first() {
837+
self.notify_read = true;
838+
}
828839
} else {
829840
self.close();
830841
}

src/proto/h1/dispatch.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ where
466466
// user has dropped sender handle
467467
Ok(Async::Ready(None))
468468
},
469-
Ok(Async::NotReady) => return Ok(Async::NotReady),
469+
Ok(Async::NotReady) => Ok(Async::NotReady),
470470
Err(never) => match never {},
471471
}
472472
}

tests/client.rs

+77
Original file line numberDiff line numberDiff line change
@@ -1215,6 +1215,83 @@ mod dispatch_impl {
12151215
assert_eq!(connects.load(Ordering::Relaxed), 2);
12161216
}
12171217

1218+
#[test]
1219+
fn client_keep_alive_when_response_before_request_body_ends() {
1220+
use futures_timer::Delay;
1221+
let _ = pretty_env_logger::try_init();
1222+
let server = TcpListener::bind("127.0.0.1:0").unwrap();
1223+
let addr = server.local_addr().unwrap();
1224+
let mut rt = Runtime::new().unwrap();
1225+
1226+
let connector = DebugConnector::new();
1227+
let connects = connector.connects.clone();
1228+
1229+
let client = Client::builder()
1230+
.build(connector);
1231+
1232+
let (tx1, rx1) = oneshot::channel();
1233+
let (tx2, rx2) = oneshot::channel();
1234+
let (tx3, rx3) = oneshot::channel();
1235+
thread::spawn(move || {
1236+
let mut sock = server.accept().unwrap().0;
1237+
sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
1238+
sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap();
1239+
let mut buf = [0; 4096];
1240+
sock.read(&mut buf).expect("read 1");
1241+
sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").expect("write 1");
1242+
// after writing the response, THEN stream the body
1243+
let _ = tx1.send(());
1244+
1245+
sock.read(&mut buf).expect("read 2");
1246+
let _ = tx2.send(());
1247+
1248+
let n2 = sock.read(&mut buf).expect("read 3");
1249+
assert_ne!(n2, 0);
1250+
let second_get = "GET /b HTTP/1.1\r\n";
1251+
assert_eq!(s(&buf[..second_get.len()]), second_get);
1252+
sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").expect("write 2");
1253+
let _ = tx3.send(());
1254+
});
1255+
1256+
1257+
assert_eq!(connects.load(Ordering::Relaxed), 0);
1258+
1259+
let delayed_body = rx1
1260+
.map_err(|_| -> hyper::Error { panic!("rx1") })
1261+
.and_then(|_| Delay::new(Duration::from_millis(200)).map_err(|_| panic!("delay")))
1262+
.into_stream()
1263+
.map(|_| "hello a");
1264+
1265+
let rx = rx2.expect("thread panicked");
1266+
let req = Request::builder()
1267+
.method("POST")
1268+
.uri(&*format!("http://{}/a", addr))
1269+
.body(Body::wrap_stream(delayed_body))
1270+
.unwrap();
1271+
let client2 = client.clone();
1272+
1273+
// req 1
1274+
let fut = client.request(req)
1275+
.join(rx)
1276+
.and_then(|_| Delay::new(Duration::from_millis(200)).expect("delay"))
1277+
// req 2
1278+
.and_then(move |()| {
1279+
let rx = rx3.expect("thread panicked");
1280+
let req = Request::builder()
1281+
.uri(&*format!("http://{}/b", addr))
1282+
.body(Body::empty())
1283+
.unwrap();
1284+
client2
1285+
.request(req)
1286+
.join(rx)
1287+
.map(|_| ())
1288+
});
1289+
1290+
rt.block_on(fut).unwrap();
1291+
1292+
assert_eq!(connects.load(Ordering::Relaxed), 1);
1293+
}
1294+
12181295
#[test]
12191296
fn connect_proxy_sends_absolute_uri() {
12201297
let _ = pretty_env_logger::try_init();

0 commit comments

Comments
 (0)