Skip to content

feat(upgrade): Moved HTTP upgrades off Body to a new API #2337

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/http_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async fn proxy(client: HttpClient, req: Request<Body>) -> Result<Response<Body>,
// `on_upgrade` future.
if let Some(addr) = host_addr(req.uri()) {
tokio::task::spawn(async move {
match req.into_body().on_upgrade().await {
match hyper::upgrade::on(req).await {
Ok(upgraded) => {
if let Err(e) = tunnel(upgraded, addr).await {
eprintln!("server io error: {}", e);
Expand Down
6 changes: 3 additions & 3 deletions examples/upgrades.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async fn server_upgraded_io(mut upgraded: Upgraded) -> Result<()> {
}

/// Our server HTTP handler to initiate HTTP upgrades.
async fn server_upgrade(req: Request<Body>) -> Result<Response<Body>> {
async fn server_upgrade(mut req: Request<Body>) -> Result<Response<Body>> {
let mut res = Response::new(Body::empty());

// Send a 400 to any request that doesn't have
Expand All @@ -52,7 +52,7 @@ async fn server_upgrade(req: Request<Body>) -> Result<Response<Body>> {
// is returned below, so it's better to spawn this future instead
// waiting for it to complete to then return a response.
tokio::task::spawn(async move {
match req.into_body().on_upgrade().await {
match hyper::upgrade::on(&mut req).await {
Ok(upgraded) => {
if let Err(e) = server_upgraded_io(upgraded).await {
eprintln!("server foobar io error: {}", e)
Expand Down Expand Up @@ -97,7 +97,7 @@ async fn client_upgrade_request(addr: SocketAddr) -> Result<()> {
panic!("Our server didn't upgrade: {}", res.status());
}

match res.into_body().on_upgrade().await {
match hyper::upgrade::on(res).await {
Ok(upgraded) => {
if let Err(e) = client_upgraded_io(upgraded).await {
eprintln!("client foobar io error: {}", e)
Expand Down
16 changes: 9 additions & 7 deletions src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,15 @@ impl Body {
Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
}

/// Converts this `Body` into a `Future` of a pending HTTP upgrade.
///
/// See [the `upgrade` module](crate::upgrade) for more.
pub fn on_upgrade(self) -> OnUpgrade {
self.extra
.map(|ex| ex.on_upgrade)
.unwrap_or_else(OnUpgrade::none)
// TODO: Eventually the pending upgrade should be stored in the
// `Extensions`, and all these pieces can be removed. In v0.14, we made
// the breaking changes, so now this TODO can be done without breakage.
pub(crate) fn take_upgrade(&mut self) -> OnUpgrade {
if let Some(ref mut extra) = self.extra {
std::mem::replace(&mut extra.on_upgrade, OnUpgrade::none())
} else {
OnUpgrade::none()
}
}

fn new(kind: Kind) -> Body {
Expand Down
57 changes: 46 additions & 11 deletions src/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,16 @@ pub struct Parts<T> {
_inner: (),
}

/// Gets a pending HTTP upgrade from this message.
pub fn on<T: sealed::CanUpgrade>(msg: T) -> OnUpgrade {
msg.on_upgrade()
}

#[cfg(feature = "http1")]
pub(crate) struct Pending {
tx: oneshot::Sender<crate::Result<Upgraded>>,
}

/// Error cause returned when an upgrade was expected but canceled
/// for whatever reason.
///
/// This likely means the actual `Conn` future wasn't polled and upgraded.
#[derive(Debug)]
struct UpgradeExpected(());

#[cfg(feature = "http1")]
pub(crate) fn pending() -> (Pending, OnUpgrade) {
let (tx, rx) = oneshot::channel();
Expand Down Expand Up @@ -162,9 +160,7 @@ impl Future for OnUpgrade {
Some(ref mut rx) => Pin::new(rx).poll(cx).map(|res| match res {
Ok(Ok(upgraded)) => Ok(upgraded),
Ok(Err(err)) => Err(err),
Err(_oneshot_canceled) => {
Err(crate::Error::new_canceled().with(UpgradeExpected(())))
}
Err(_oneshot_canceled) => Err(crate::Error::new_canceled().with(UpgradeExpected)),
}),
None => Poll::Ready(Err(crate::Error::new_user_no_upgrade())),
}
Expand Down Expand Up @@ -196,9 +192,16 @@ impl Pending {

// ===== impl UpgradeExpected =====

/// Error cause returned when an upgrade was expected but canceled
/// for whatever reason.
///
/// This likely means the actual `Conn` future wasn't polled and upgraded.
#[derive(Debug)]
struct UpgradeExpected;

impl fmt::Display for UpgradeExpected {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "upgrade expected but not completed")
f.write_str("upgrade expected but not completed")
}
}

Expand Down Expand Up @@ -277,6 +280,38 @@ impl<T: AsyncRead + AsyncWrite + Unpin + 'static> Io for ForwardsWriteBuf<T> {
}
}

mod sealed {
use super::OnUpgrade;

pub trait CanUpgrade {
fn on_upgrade(self) -> OnUpgrade;
}

impl CanUpgrade for http::Request<crate::Body> {
fn on_upgrade(self) -> OnUpgrade {
self.into_body().take_upgrade()
}
}

impl CanUpgrade for &'_ mut http::Request<crate::Body> {
fn on_upgrade(self) -> OnUpgrade {
self.body_mut().take_upgrade()
}
}

impl CanUpgrade for http::Response<crate::Body> {
fn on_upgrade(self) -> OnUpgrade {
self.into_body().take_upgrade()
}
}

impl CanUpgrade for &'_ mut http::Response<crate::Body> {
fn on_upgrade(self) -> OnUpgrade {
self.body_mut().take_upgrade()
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
4 changes: 1 addition & 3 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1790,9 +1790,7 @@ mod dispatch_impl {
let res = rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap();

assert_eq!(res.status(), 101);
let upgraded = rt
.block_on(res.into_body().on_upgrade())
.expect("on_upgrade");
let upgraded = rt.block_on(hyper::upgrade::on(res)).expect("on_upgrade");

let parts = upgraded.downcast::<DebugStream>().unwrap();
assert_eq!(s(&parts.read_buf), "foobar=ready");
Expand Down
4 changes: 2 additions & 2 deletions tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1341,7 +1341,7 @@ async fn upgrades_new() {

let (upgrades_tx, upgrades_rx) = mpsc::channel();
let svc = service_fn(move |req: Request<Body>| {
let on_upgrade = req.into_body().on_upgrade();
let on_upgrade = hyper::upgrade::on(req);
let _ = upgrades_tx.send(on_upgrade);
future::ok::<_, hyper::Error>(
Response::builder()
Expand Down Expand Up @@ -1448,7 +1448,7 @@ async fn http_connect_new() {

let (upgrades_tx, upgrades_rx) = mpsc::channel();
let svc = service_fn(move |req: Request<Body>| {
let on_upgrade = req.into_body().on_upgrade();
let on_upgrade = hyper::upgrade::on(req);
let _ = upgrades_tx.send(on_upgrade);
future::ok::<_, hyper::Error>(
Response::builder()
Expand Down