Skip to content

Commit 1d7cdab

Browse files
Michael-J-Wardseanmonstar
authored andcommitted
wip: remove client::conn::Connection
1 parent 2988baa commit 1d7cdab

File tree

12 files changed

+189
-976
lines changed

12 files changed

+189
-976
lines changed

examples/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ async fn fetch_url(url: hyper::Uri) -> Result<()> {
4141
let addr = format!("{}:{}", host, port);
4242
let stream = TcpStream::connect(addr).await?;
4343

44-
let (mut sender, conn) = hyper::client::conn::handshake(stream).await?;
44+
let (mut sender, conn) = hyper::client::conn::http1::handshake(stream).await?;
4545
tokio::task::spawn(async move {
4646
if let Err(err) = conn.await {
4747
println!("Connection failed: {:?}", err);

examples/client_json.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ async fn fetch_json(url: hyper::Uri) -> Result<Vec<User>> {
3030

3131
let stream = TcpStream::connect(addr).await?;
3232

33-
let (mut sender, conn) = hyper::client::conn::handshake(stream).await?;
33+
let (mut sender, conn) = hyper::client::conn::http1::handshake(stream).await?;
3434
tokio::task::spawn(async move {
3535
if let Err(err) = conn.await {
3636
println!("Connection failed: {:?}", err);

examples/gateway.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4343
async move {
4444
let client_stream = TcpStream::connect(addr).await.unwrap();
4545

46-
let (mut sender, conn) = hyper::client::conn::handshake(client_stream).await?;
46+
let (mut sender, conn) =
47+
hyper::client::conn::http1::handshake(client_stream).await?;
4748
tokio::task::spawn(async move {
4849
if let Err(err) = conn.await {
4950
println!("Connection failed: {:?}", err);

examples/http_proxy.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::net::SocketAddr;
44

55
use bytes::Bytes;
66
use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
7-
use hyper::client::conn::Builder;
7+
use hyper::client::conn::http1::Builder;
88
use hyper::server::conn::Http;
99
use hyper::service::service_fn;
1010
use hyper::upgrade::Upgraded;

examples/upgrades.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ async fn client_upgrade_request(addr: SocketAddr) -> Result<()> {
9797
.unwrap();
9898

9999
let stream = TcpStream::connect(addr).await?;
100-
let (mut sender, conn) = hyper::client::conn::handshake(stream).await?;
100+
let (mut sender, conn) = hyper::client::conn::http1::handshake(stream).await?;
101101

102102
tokio::task::spawn(async move {
103103
if let Err(err) = conn.await {

examples/web_api.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ async fn client_request_response() -> Result<Response<BoxBody>> {
3131
let port = req.uri().port_u16().expect("uri has no port");
3232
let stream = TcpStream::connect(format!("{}:{}", host, port)).await?;
3333

34-
let (mut sender, conn) = hyper::client::conn::handshake(stream).await?;
34+
let (mut sender, conn) = hyper::client::conn::http1::handshake(stream).await?;
3535

3636
tokio::task::spawn(async move {
3737
if let Err(err) = conn.await {

src/client/conn/http1.rs

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::error::Error as StdError;
44
use std::fmt;
55
use std::sync::Arc;
66

7+
use bytes::Bytes;
78
use http::{Request, Response};
89
use httparse::ParserConfig;
910
use tokio::io::{AsyncRead, AsyncWrite};
@@ -27,6 +28,27 @@ pub struct SendRequest<B> {
2728
dispatch: dispatch::Sender<Request<B>, Response<Recv>>,
2829
}
2930

31+
/// Deconstructed parts of a `Connection`.
32+
///
33+
/// This allows taking apart a `Connection` at a later time, in order to
34+
/// reclaim the IO object, and additional related pieces.
35+
#[derive(Debug)]
36+
pub struct Parts<T> {
37+
/// The original IO object used in the handshake.
38+
pub io: T,
39+
/// A buffer of bytes that have been read but not processed as HTTP.
40+
///
41+
/// For instance, if the `Connection` is used for an HTTP upgrade request,
42+
/// it is possible the server sent back the first bytes of the new protocol
43+
/// along with the response upgrade.
44+
///
45+
/// You will want to check for any existing bytes if you plan to continue
46+
/// communicating on the IO object.
47+
pub read_buf: Bytes,
48+
_inner: (),
49+
}
50+
51+
3052
/// A future that processes all HTTP state for the IO object.
3153
///
3254
/// In most cases, this should just be spawned into an executor, so that it
@@ -40,6 +62,40 @@ where
4062
inner: Option<Dispatcher<T, B>>,
4163
}
4264

65+
impl<T, B> Connection<T, B>
66+
where
67+
T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
68+
B: Body + 'static,
69+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
70+
{
71+
/// Return the inner IO object, and additional information.
72+
///
73+
/// Only works for HTTP/1 connections. HTTP/2 connections will panic.
74+
pub fn into_parts(self) -> Parts<T> {
75+
let (io, read_buf, _) = self.inner.expect("already upgraded").into_inner();
76+
Parts {
77+
io,
78+
read_buf,
79+
_inner: (),
80+
}
81+
}
82+
83+
/// Poll the connection for completion, but without calling `shutdown`
84+
/// on the underlying IO.
85+
///
86+
/// This is useful to allow running a connection while doing an HTTP
87+
/// upgrade. Once the upgrade is completed, the connection would be "done",
88+
/// but it is not desired to actually shutdown the IO object. Instead you
89+
/// would take it back using `into_parts`.
90+
///
91+
/// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
92+
/// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
93+
/// to work with this function; or use the `without_shutdown` wrapper.
94+
pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
95+
self.inner.as_mut().expect("algready upgraded").poll_without_shutdown(cx)
96+
}
97+
}
98+
4399
/// A builder to configure an HTTP connection.
44100
///
45101
/// After setting options, the builder is used to create a handshake future.
@@ -61,11 +117,14 @@ pub struct Builder {
61117
///
62118
/// This is a shortcut for `Builder::new().handshake(io)`.
63119
/// See [`client::conn`](crate::client::conn) for more.
64-
pub async fn handshake<T>(
120+
pub async fn handshake<T, B>(
65121
io: T,
66-
) -> crate::Result<(SendRequest<crate::Recv>, Connection<T, crate::Recv>)>
122+
) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
67123
where
68124
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
125+
B: Body + 'static,
126+
B::Data: Send,
127+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
69128
{
70129
Builder::new().handshake(io).await
71130
}
@@ -80,6 +139,13 @@ impl<B> SendRequest<B> {
80139
self.dispatch.poll_ready(cx)
81140
}
82141

142+
/// Waits until the dispatcher is ready
143+
///
144+
/// If the associated connection is closed, this returns an Error.
145+
pub async fn ready(&mut self) -> crate::Result<()> {
146+
futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
147+
}
148+
83149
/*
84150
pub(super) async fn when_ready(self) -> crate::Result<Self> {
85151
let mut me = Some(self);

src/client/conn/http2.rs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,14 @@ pub struct Builder {
5151
///
5252
/// This is a shortcut for `Builder::new().handshake(io)`.
5353
/// See [`client::conn`](crate::client::conn) for more.
54-
pub async fn handshake<T>(
54+
pub async fn handshake<T, B>(
5555
io: T,
56-
) -> crate::Result<(SendRequest<crate::Recv>, Connection<T, crate::Recv>)>
56+
) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
5757
where
5858
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
59+
B: Body + 'static,
60+
B::Data: Send,
61+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
5962
{
6063
Builder::new().handshake(io).await
6164
}
@@ -74,6 +77,13 @@ impl<B> SendRequest<B> {
7477
}
7578
}
7679

80+
/// Waits until the dispatcher is ready
81+
///
82+
/// If the associated connection is closed, this returns an Error.
83+
pub async fn ready(&mut self) -> crate::Result<()> {
84+
futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
85+
}
86+
7787
/*
7888
pub(super) async fn when_ready(self) -> crate::Result<Self> {
7989
let mut me = Some(self);
@@ -171,6 +181,27 @@ impl<B> fmt::Debug for SendRequest<B> {
171181

172182
// ===== impl Connection
173183

184+
impl<T, B> Connection<T, B>
185+
where
186+
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
187+
B: Body + Unpin + Send + 'static,
188+
B::Data: Send,
189+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
190+
{
191+
/// Returns whether the [extended CONNECT protocol][1] is enabled or not.
192+
///
193+
/// This setting is configured by the server peer by sending the
194+
/// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
195+
/// This method returns the currently acknowledged value received from the
196+
/// remote.
197+
///
198+
/// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
199+
/// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
200+
pub fn is_extended_connect_protocol_enabled(&self) -> bool {
201+
self.inner.1.is_extended_connect_protocol_enabled()
202+
}
203+
}
204+
174205
impl<T, B> fmt::Debug for Connection<T, B>
175206
where
176207
T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static,

0 commit comments

Comments
 (0)