Skip to content

Commit 845c9c9

Browse files
committed
feat(h2): implement CONNECT support (fixes #2508)
1 parent 90c14ee commit 845c9c9

File tree

10 files changed

+767
-86
lines changed

10 files changed

+767
-86
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ http = "0.2"
3131
http-body = "0.4"
3232
httpdate = "1.0"
3333
httparse = "1.4"
34-
h2 = { version = "0.3", optional = true }
34+
h2 = { version = "0.3.3", optional = true }
3535
itoa = "0.4.1"
3636
tracing = { version = "0.1", default-features = false, features = ["std"] }
3737
pin-project = "1.0"

src/body/length.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,17 @@ use std::fmt;
33
#[derive(Clone, Copy, PartialEq, Eq)]
44
pub(crate) struct DecodedLength(u64);
55

6+
#[cfg(any(feature = "http1", feature = "http2"))]
7+
impl From<Option<u64>> for DecodedLength {
8+
fn from(len: Option<u64>) -> Self {
9+
len.and_then(|len| {
10+
// If the length is u64::MAX, oh well, just reported chunked.
11+
Self::checked_new(len).ok()
12+
})
13+
.unwrap_or(DecodedLength::CHUNKED)
14+
}
15+
}
16+
617
#[cfg(any(feature = "http1", feature = "http2", test))]
718
const MAX_LEN: u64 = std::u64::MAX - 2;
819

src/client/client.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -243,20 +243,15 @@ where
243243
.expect("uri host is valid header value")
244244
});
245245
}
246+
}
246247

247-
// CONNECT always sends authority-form, so check it first...
248-
if req.method() == Method::CONNECT {
249-
authority_form(req.uri_mut());
250-
} else if pooled.conn_info.is_proxied {
251-
absolute_form(req.uri_mut());
252-
} else {
253-
origin_form(req.uri_mut());
254-
};
255-
} else if req.method() == Method::CONNECT {
256-
debug!("client does not support CONNECT requests over HTTP2");
257-
return Err(ClientError::Normal(
258-
crate::Error::new_user_unsupported_request_method(),
259-
));
248+
// CONNECT always sends authority-form, so check it first...
249+
if req.method() == Method::CONNECT {
250+
authority_form(req.uri_mut());
251+
} else if pooled.conn_info.is_proxied {
252+
absolute_form(req.uri_mut());
253+
} else {
254+
origin_form(req.uri_mut());
260255
}
261256

262257
let fut = pooled

src/error.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ pub(super) enum User {
9090
/// User tried to send a certain header in an unexpected context.
9191
///
9292
/// For example, sending both `content-length` and `transfer-encoding`.
93-
#[cfg(feature = "http1")]
93+
#[cfg(any(feature = "http1", feature = "http2"))]
9494
#[cfg(feature = "server")]
9595
UnexpectedHeader,
9696
/// User tried to create a Request with bad version.
@@ -290,7 +290,7 @@ impl Error {
290290
Error::new(Kind::User(user))
291291
}
292292

293-
#[cfg(feature = "http1")]
293+
#[cfg(any(feature = "http1", feature = "http2"))]
294294
#[cfg(feature = "server")]
295295
pub(super) fn new_user_header() -> Error {
296296
Error::new_user(User::UnexpectedHeader)
@@ -405,7 +405,7 @@ impl Error {
405405
Kind::User(User::MakeService) => "error from user's MakeService",
406406
#[cfg(any(feature = "http1", feature = "http2"))]
407407
Kind::User(User::Service) => "error from user's Service",
408-
#[cfg(feature = "http1")]
408+
#[cfg(any(feature = "http1", feature = "http2"))]
409409
#[cfg(feature = "server")]
410410
Kind::User(User::UnexpectedHeader) => "user sent unexpected header",
411411
#[cfg(any(feature = "http1", feature = "http2"))]

src/proto/h2/client.rs

Lines changed: 79 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,21 @@ use std::error::Error as StdError;
22
#[cfg(feature = "runtime")]
33
use std::time::Duration;
44

5+
use bytes::{Bytes};
56
use futures_channel::{mpsc, oneshot};
67
use futures_util::future::{self, Either, FutureExt as _, TryFutureExt as _};
78
use futures_util::stream::StreamExt as _;
89
use h2::client::{Builder, SendRequest};
10+
use http::Method;
911
use tokio::io::{AsyncRead, AsyncWrite};
1012

11-
use super::{decode_content_length, ping, PipeToSendStream, SendBuf};
13+
use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
1214
use crate::body::HttpBody;
1315
use crate::common::{exec::Exec, task, Future, Never, Pin, Poll};
1416
use crate::headers;
1517
use crate::proto::Dispatched;
18+
use crate::proto::h2::UpgradedSendStream;
19+
use crate::upgrade::Upgraded;
1620
use crate::{Body, Request, Response};
1721

1822
type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<Body>>;
@@ -233,8 +237,20 @@ where
233237
headers::set_content_length_if_missing(req.headers_mut(), len);
234238
}
235239
}
240+
241+
let is_connect = req.method() == Method::CONNECT;
236242
let eos = body.is_end_stream();
237-
let (fut, body_tx) = match self.h2_tx.send_request(req, eos) {
243+
let ping = self.ping.clone();
244+
245+
if is_connect {
246+
if headers::content_length_parse_all(req.headers()).map_or(false, |len| len != 0) {
247+
warn!("h2 connect request with non-zero body not supported");
248+
cb.send(Err((crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), None)));
249+
continue;
250+
}
251+
}
252+
253+
let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
238254
Ok(ok) => ok,
239255
Err(err) => {
240256
debug!("client send request error: {}", err);
@@ -243,45 +259,76 @@ where
243259
}
244260
};
245261

246-
let ping = self.ping.clone();
247-
if !eos {
248-
let mut pipe = Box::pin(PipeToSendStream::new(body, body_tx)).map(|res| {
249-
if let Err(e) = res {
250-
debug!("client request body error: {}", e);
251-
}
252-
});
253-
254-
// eagerly see if the body pipe is ready and
255-
// can thus skip allocating in the executor
256-
match Pin::new(&mut pipe).poll(cx) {
257-
Poll::Ready(_) => (),
258-
Poll::Pending => {
259-
let conn_drop_ref = self.conn_drop_ref.clone();
260-
// keep the ping recorder's knowledge of an
261-
// "open stream" alive while this body is
262-
// still sending...
263-
let ping = ping.clone();
264-
let pipe = pipe.map(move |x| {
265-
drop(conn_drop_ref);
266-
drop(ping);
267-
x
262+
let send_stream = if !is_connect {
263+
if !eos {
264+
let mut pipe =
265+
Box::pin(PipeToSendStream::new(body, body_tx)).map(|res| {
266+
if let Err(e) = res {
267+
debug!("client request body error: {}", e);
268+
}
268269
});
269-
self.executor.execute(pipe);
270+
271+
// eagerly see if the body pipe is ready and
272+
// can thus skip allocating in the executor
273+
match Pin::new(&mut pipe).poll(cx) {
274+
Poll::Ready(_) => (),
275+
Poll::Pending => {
276+
let conn_drop_ref = self.conn_drop_ref.clone();
277+
// keep the ping recorder's knowledge of an
278+
// "open stream" alive while this body is
279+
// still sending...
280+
let ping = ping.clone();
281+
let pipe = pipe.map(move |x| {
282+
drop(conn_drop_ref);
283+
drop(ping);
284+
x
285+
});
286+
self.executor.execute(pipe);
287+
}
270288
}
271289
}
272-
}
290+
291+
None
292+
} else {
293+
Some(body_tx)
294+
};
273295

274296
let fut = fut.map(move |result| match result {
275297
Ok(res) => {
276298
// record that we got the response headers
277299
ping.record_non_data();
278300

279-
let content_length = decode_content_length(res.headers());
280-
let res = res.map(|stream| {
281-
let ping = ping.for_stream(&stream);
282-
crate::Body::h2(stream, content_length, ping)
283-
});
284-
Ok(res)
301+
let content_length = headers::content_length_parse_all(res.headers());
302+
if let Some(mut send_stream) = send_stream {
303+
if content_length.map_or(false, |len| len != 0) {
304+
warn!("h2 connect response with non-zero body not supported");
305+
306+
send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
307+
return Err((crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), None));
308+
}
309+
let (parts, recv_stream) = res.into_parts();
310+
let mut res = Response::from_parts(parts, Body::empty());
311+
312+
let (pending, on_upgrade) = crate::upgrade::pending();
313+
let io = H2Upgraded {
314+
ping,
315+
send_stream: unsafe { UpgradedSendStream::new(send_stream) },
316+
recv_stream,
317+
buf: Bytes::new(),
318+
};
319+
let upgraded = Upgraded::new(io, Bytes::new());
320+
321+
pending.fulfill(upgraded);
322+
res.extensions_mut().insert(on_upgrade);
323+
324+
Ok(res)
325+
} else {
326+
let res = res.map(|stream| {
327+
let ping = ping.for_stream(&stream);
328+
crate::Body::h2(stream, content_length.into(), ping)
329+
});
330+
Ok(res)
331+
}
285332
}
286333
Err(err) => {
287334
ping.ensure_not_timed_out().map_err(|e| (e, None))?;

0 commit comments

Comments
 (0)