Skip to content

Commit 7310969

Browse files
committed
fix(h1): fix hung streaming bodies over HTTPS
1 parent 0c290c3 commit 7310969

File tree

2 files changed

+61
-8
lines changed

2 files changed

+61
-8
lines changed

src/proto/h1/io.rs

+22-6
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,9 @@ where
158158
debug!("flushed {} bytes", n);
159159
if self.write_buf.remaining() == 0 {
160160
break;
161+
} else if n == 0 {
162+
trace!("write returned zero, but {} bytes remaining", self.write_buf.remaining());
163+
return Err(io::ErrorKind::WriteZero.into())
161164
}
162165
}
163166
try_nb!(self.io.flush())
@@ -391,8 +394,20 @@ impl<B: Buf> Buf for VecOrBuf<B> {
391394
#[inline]
392395
fn bytes_vec<'t>(&'t self, dst: &mut [&'t IoVec]) -> usize {
393396
match *self {
394-
VecOrBuf::Vec(ref v) => v.bytes_vec(dst),
395-
VecOrBuf::Buf(ref b) => b.bytes_vec(dst),
397+
VecOrBuf::Vec(ref v) => {
398+
if v.has_remaining() {
399+
v.bytes_vec(dst)
400+
} else {
401+
0
402+
}
403+
},
404+
VecOrBuf::Buf(ref b) => {
405+
if b.has_remaining() {
406+
b.bytes_vec(dst)
407+
} else {
408+
0
409+
}
410+
},
396411
}
397412
}
398413
}
@@ -420,11 +435,12 @@ impl<T: Buf> Buf for BufDeque<T> {
420435

421436
#[inline]
422437
fn bytes(&self) -> &[u8] {
423-
if let Some(buf) = self.bufs.front() {
424-
buf.bytes()
425-
} else {
426-
&[]
438+
for buf in &self.bufs {
439+
if buf.has_remaining() {
440+
return buf.bytes();
441+
}
427442
}
443+
&[]
428444
}
429445

430446
#[inline]

tests/client.rs

+39-2
Original file line numberDiff line numberDiff line change
@@ -1006,8 +1006,6 @@ mod dispatch_impl {
10061006
assert_eq!(err.description(), "event loop gone");
10071007
}
10081008

1009-
1010-
10111009
#[test]
10121010
fn client_custom_executor() {
10131011
let server = TcpListener::bind("127.0.0.1:0").unwrap();
@@ -1046,6 +1044,45 @@ mod dispatch_impl {
10461044
assert_eq!(closes.load(Ordering::Relaxed), 1);
10471045
}
10481046

1047+
#[test]
1048+
fn client_body_mpsc() {
1049+
use futures::Sink;
1050+
let _ = pretty_env_logger::try_init();
1051+
let server = TcpListener::bind("127.0.0.1:0").unwrap();
1052+
let addr = server.local_addr().unwrap();
1053+
let mut core = Core::new().unwrap();
1054+
let handle = core.handle();
1055+
let closes = Arc::new(AtomicUsize::new(0));
1056+
1057+
let (tx1, rx1) = oneshot::channel();
1058+
1059+
thread::spawn(move || {
1060+
let mut sock = server.accept().unwrap().0;
1061+
sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
1062+
sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap();
1063+
let mut buf = [0; 4096];
1064+
sock.read(&mut buf).expect("read 1");
1065+
sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").unwrap();
1066+
let _ = tx1.send(());
1067+
});
1068+
1069+
let uri = format!("http://{}/a", addr).parse().unwrap();
1070+
1071+
let client = Client::configure()
1072+
.connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone()))
1073+
.build(&handle);
1074+
let mut req = Request::new(Method::Post, uri);
1075+
let (tx, body) = hyper::Body::pair();
1076+
req.set_body(body);
1077+
let res = client.request(req).and_then(move |res| {
1078+
assert_eq!(res.status(), hyper::StatusCode::Ok);
1079+
res.body().concat2()
1080+
});
1081+
let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked")));
1082+
let send = tx.send_all(::futures::stream::iter_ok(vec!["hello"; 2]).map(hyper::Chunk::from).map(Ok)).then(|_| Ok(()));
1083+
core.run(res.join(send).join(rx).map(|r| r.0)).unwrap();
1084+
}
1085+
10491086
struct DebugConnector(HttpConnector, Arc<AtomicUsize>);
10501087

10511088
impl Service for DebugConnector {

0 commit comments

Comments
 (0)