Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix http upgrades #1980

Merged
merged 3 commits into from
Nov 27, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix http upgrades
  • Loading branch information
dr-bonez committed Nov 27, 2022
commit 222ef3e0e25e4bfdf9d6036587d3e9cdf803f7cc
2 changes: 1 addition & 1 deletion backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ embassy_container_init = { path = "../libs/embassy_container_init" }
hex = "0.4.3"
hmac = "0.12.1"
http = "0.2.8"
hyper = "0.14.20"
hyper = { version = "0.14.20", features = ["full"] }
hyper-ws-listener = "0.2.0"
imbl = "2.0.0"
indexmap = { version = "1.9.1", features = ["serde"] }
Expand Down
4 changes: 1 addition & 3 deletions backend/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::BTreeMap;
use std::sync::Arc;

use futures::future::BoxFuture;
use hyper::{Body, Client, Error as HyperError, Request, Response};
use hyper::{Body, Error as HyperError, Request, Response};
use indexmap::IndexSet;
use rpc_toolkit::command;

Expand Down Expand Up @@ -51,5 +51,3 @@ pub struct GeneratedCertificateMountPoint(());
pub type HttpHandler = Arc<
dyn Fn(Request<Body>) -> BoxFuture<'static, Result<Response<Body>, HyperError>> + Send + Sync,
>;

pub type HttpClient = Client<hyper::client::HttpConnector>;
126 changes: 52 additions & 74 deletions backend/src/net/proxy_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,19 @@ use std::sync::Arc;

use color_eyre::eyre::eyre;
use futures::FutureExt;
use http::{Method, Request, Response};
use hyper::upgrade::Upgraded;
use http::uri::{Authority, Scheme};
use http::{Request, Response, Uri};
use hyper::{Body, Error as HyperError};
use models::{InterfaceId, PackageId};
use openssl::pkey::{PKey, Private};
use openssl::x509::X509;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tracing::{error, info, instrument};
use tracing::{error, instrument};

use crate::net::net_utils::ResourceFqdn;
use crate::net::ssl::SslManager;
use crate::net::vhost_controller::VHOSTController;
use crate::net::{HttpClient, HttpHandler, InterfaceMetadata, PackageNetInfo};
use crate::net::{HttpHandler, InterfaceMetadata, PackageNetInfo};
use crate::{Error, ResultExt};

pub struct ProxyController {
Expand Down Expand Up @@ -85,59 +84,54 @@ impl ProxyController {
self.inner.lock().await.get_embassy_hostname()
}

pub async fn proxy(
client: HttpClient,
req: Request<Body>,
async fn proxy(
client: &hyper::Client<hyper::client::HttpConnector>,
mut req: Request<Body>,
addr: SocketAddr,
) -> Result<Response<Body>, HyperError> {
if Method::CONNECT == req.method() {
// Received an HTTP request like:
// ```
// CONNECT www.domain.com:443 HTTP/1.1s
// Host: www.domain.com:443
// Proxy-Connection: Keep-Alive
// ```
//
// When HTTP method is CONNECT we should return an empty body
// then we can eventually upgrade the connection and talk a new protocol.
//
// Note: only after client received an empty body with STATUS_OK can the
// connection be upgraded, so we can't return a response inside
// `on_upgrade` future.

tokio::task::spawn(async move {
let addr = req.uri().clone();

match hyper::upgrade::on(req).await {
Ok(upgraded) => {
if let Err(e) = Self::tunnel(upgraded, addr.to_string()).await {
error!("server io error: {}", e);
}
}
Err(e) => error!("upgrade error: {}", e),
let mut uri = std::mem::take(req.uri_mut()).into_parts();
uri.scheme = Some(Scheme::HTTP);
uri.authority = req
.headers()
.get(http::header::HOST)
.and_then(|h| h.to_str().ok())
// .and_then(|h| Authority::from_str(h).ok());
.and_then(|_| Authority::from_str(&addr.to_string()).ok());
match Uri::from_parts(uri) {
Ok(uri) => *req.uri_mut() = uri,
Err(e) => error!("Error rewriting uri: {}", e),
}
let addr = dbg!(req.uri().to_string());
if req
.headers()
.get("connection")
.and_then(|c| c.to_str().ok())
.map(|c| {
c.split(",")
.any(|c| c.trim().eq_ignore_ascii_case("upgrade"))
})
.unwrap_or(false)
{
let upgraded_req = hyper::upgrade::on(&mut req);
let mut res = client.request(req).await?;
let upgraded_res = hyper::upgrade::on(&mut res);
tokio::spawn(async move {
if let Err(e) = async {
let mut req = upgraded_req.await?;
let mut res = upgraded_res.await?;
tokio::io::copy_bidirectional(&mut req, &mut res).await?;
Ok::<_, color_eyre::eyre::Report>(())
}
.await
{
error!("error binding together tcp streams for {}: {}", addr, e);
}
});

Ok(Response::new(Body::empty()))
Ok(res)
} else {
client.request(req).await
}
}

// Create a TCP connection to host:port, build a tunnel between the connection and
// the upgraded connection
async fn tunnel(mut upgraded: Upgraded, addr: String) -> std::io::Result<()> {
let mut server = TcpStream::connect(addr).await?;

let (from_client, from_server) =
tokio::io::copy_bidirectional(&mut upgraded, &mut server).await?;

info!(
"client wrote {} bytes and received {} bytes",
from_client, from_server
);

Ok(())
}
}
struct ProxyControllerInner {
ssl_manager: SslManager,
Expand Down Expand Up @@ -263,7 +257,7 @@ impl ProxyControllerInner {
.await?;

let svc_handler =
Self::create_docker_handle(docker_ipv4.to_string(), lan_port_config.internal)
Self::create_docker_handle((docker_ipv4, lan_port_config.internal).into())
.await;

self.add_handle(
Expand All @@ -282,28 +276,12 @@ impl ProxyControllerInner {
Ok(())
}

async fn create_docker_handle(internal_ip: String, port: u16) -> HttpHandler {
let svc_handler: HttpHandler = Arc::new(move |mut req| {
let proxy_addr = internal_ip.clone();
async move {
let client = HttpClient::new();

let uri_string = format!(
"http://{}:{}{}",
proxy_addr,
port,
req.uri()
.path_and_query()
.map(|x| x.as_str())
.unwrap_or("/")
);

let uri = uri_string.parse().unwrap();
*req.uri_mut() = uri;

ProxyController::proxy(client, req).await
}
.boxed()
async fn create_docker_handle(internal_addr: SocketAddr) -> HttpHandler {
let svc_handler: HttpHandler = Arc::new(move |req| {
let client = hyper::client::Client::builder()
.set_host(false)
.build_http();
async move { ProxyController::proxy(&client, req, internal_addr).await }.boxed()
});

svc_handler
Expand Down