Skip to content

Commit

Permalink
perf(client): change HttpConnecting to hold Arc<Config> instead of in…
Browse files Browse the repository at this point in the history
…lined fields
  • Loading branch information
seanmonstar committed Oct 23, 2019
1 parent f71304b commit 8b878a8
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 50 deletions.
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ path = "examples/web_api.rs"
required-features = ["runtime", "unstable-stream"]


[[bench]]
name = "connect"
path = "benches/connect.rs"
required-features = ["runtime"]

[[bench]]
name = "end_to_end"
path = "benches/end_to_end.rs"
Expand Down
34 changes: 34 additions & 0 deletions benches/connect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#![feature(test)]
#![deny(warnings)]

extern crate test;

use tokio::net::TcpListener;
use tokio::runtime::current_thread::Runtime;
use hyper::client::connect::{Destination, HttpConnector};
use hyper::service::Service;
use http::Uri;

#[bench]
fn http_connector(b: &mut test::Bencher) {
let _ = pretty_env_logger::try_init();
let mut rt = Runtime::new().unwrap();
let mut listener = rt.block_on(TcpListener::bind("127.0.0.1:0")).expect("bind");
let addr = listener.local_addr().expect("local_addr");
let uri: Uri = format!("http://{}/", addr).parse().expect("uri parse");
let dst = Destination::try_from_uri(uri).expect("destination");
let mut connector = HttpConnector::new();

rt.spawn(async move {
loop {
let _ = listener.accept().await;
}
});


b.iter(|| {
rt.block_on(async {
connector.call(dst.clone()).await.expect("connect");
});
});
}
82 changes: 32 additions & 50 deletions src/client/connect/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,16 @@ impl<R> HttpConnector<R> {
}
}

impl<R: Resolve> HttpConnector<R> {
fn invalid_url(&self, err: InvalidUrl) -> HttpConnecting<R> {
HttpConnecting {
config: self.config.clone(),
state: State::Error(Some(io::Error::new(io::ErrorKind::InvalidInput, err))),
port: 0,
}
}
}

// R: Debug required for now to allow adding it to debug output later...
impl<R: fmt::Debug> fmt::Debug for HttpConnector<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down Expand Up @@ -263,32 +273,25 @@ where

if self.config.enforce_http {
if dst.uri.scheme_part() != Some(&Scheme::HTTP) {
return invalid_url(InvalidUrl::NotHttp);
return self.invalid_url(InvalidUrl::NotHttp);
}
} else if dst.uri.scheme_part().is_none() {
return invalid_url(InvalidUrl::MissingScheme);
return self.invalid_url(InvalidUrl::MissingScheme);
}

let host = match dst.uri.host() {
Some(s) => s,
None => return invalid_url(InvalidUrl::MissingAuthority),
None => return self.invalid_url(InvalidUrl::MissingAuthority),
};
let port = match dst.uri.port_part() {
Some(port) => port.as_u16(),
None => if dst.uri.scheme_part() == Some(&Scheme::HTTPS) { 443 } else { 80 },
};

HttpConnecting {
state: State::Lazy(self.resolver.clone(), host.into(), self.config.local_address),
handle: self.config.handle.clone(),
connect_timeout: self.config.connect_timeout,
happy_eyeballs_timeout: self.config.happy_eyeballs_timeout,
keep_alive_timeout: self.config.keep_alive_timeout,
nodelay: self.config.nodelay,
config: self.config.clone(),
state: State::Lazy(self.resolver.clone(), host.into()),
port,
reuse_address: self.config.reuse_address,
send_buffer_size: self.config.send_buffer_size,
recv_buffer_size: self.config.recv_buffer_size,
}
}
}
Expand Down Expand Up @@ -321,21 +324,6 @@ impl HttpInfo {
}
}

fn invalid_url<R: Resolve>(err: InvalidUrl) -> HttpConnecting<R> {
HttpConnecting {
state: State::Error(Some(io::Error::new(io::ErrorKind::InvalidInput, err))),
handle: None,
keep_alive_timeout: None,
nodelay: false,
port: 0,
connect_timeout: None,
happy_eyeballs_timeout: None,
reuse_address: false,
send_buffer_size: None,
recv_buffer_size: None,
}
}

#[derive(Debug, Clone, Copy)]
enum InvalidUrl {
MissingScheme,
Expand All @@ -362,23 +350,16 @@ impl StdError for InvalidUrl {
#[must_use = "futures do nothing unless polled"]
#[pin_project]
pub struct HttpConnecting<R: Resolve = GaiResolver> {
config: Arc<Config>,
#[pin]
state: State<R>,
handle: Option<Handle>,
connect_timeout: Option<Duration>,
happy_eyeballs_timeout: Option<Duration>,
keep_alive_timeout: Option<Duration>,
nodelay: bool,
port: u16,
reuse_address: bool,
send_buffer_size: Option<usize>,
recv_buffer_size: Option<usize>,
}

#[pin_project]
enum State<R: Resolve> {
Lazy(R, String, Option<IpAddr>),
Resolving(#[pin] R::Future, Option<IpAddr>),
Lazy(R, String),
Resolving(#[pin] R::Future),
Connecting(ConnectingTcp),
Error(Option<io::Error>),
}
Expand All @@ -389,47 +370,48 @@ impl<R: Resolve> Future for HttpConnecting<R> {
#[project]
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let mut me = self.project();
let config: &Config = &me.config;
loop {
let state;
#[project]
match me.state.as_mut().project() {
State::Lazy(ref resolver, ref mut host, ref local_addr) => {
State::Lazy(ref resolver, ref mut host) => {
// If the host is already an IP addr (v4 or v6),
// skip resolving the dns and start connecting right away.
if let Some(addrs) = dns::IpAddrs::try_parse(host, *me.port) {
state = State::Connecting(ConnectingTcp::new(
**local_addr, addrs, *me.connect_timeout, *me.happy_eyeballs_timeout, *me.reuse_address));
config.local_address, addrs, config.connect_timeout, config.happy_eyeballs_timeout, config.reuse_address));
} else {
let name = dns::Name::new(mem::replace(host, String::new()));
state = State::Resolving(resolver.resolve(name), **local_addr);
state = State::Resolving(resolver.resolve(name));
}
},
State::Resolving(future, local_addr) => {
State::Resolving(future) => {
let addrs = ready!(future.poll(cx))?;
let port = *me.port;
let addrs = addrs
.map(|addr| SocketAddr::new(addr, port))
.collect();
let addrs = dns::IpAddrs::new(addrs);
state = State::Connecting(ConnectingTcp::new(
*local_addr, addrs, *me.connect_timeout, *me.happy_eyeballs_timeout, *me.reuse_address));
config.local_address, addrs, config.connect_timeout, config.happy_eyeballs_timeout, config.reuse_address));
},
State::Connecting(ref mut c) => {
let sock = ready!(c.poll(cx, &me.handle))?;
let sock = ready!(c.poll(cx, &config.handle))?;

if let Some(dur) = me.keep_alive_timeout {
sock.set_keepalive(Some(*dur))?;
if let Some(dur) = config.keep_alive_timeout {
sock.set_keepalive(Some(dur))?;
}

if let Some(size) = me.send_buffer_size {
sock.set_send_buffer_size(*size)?;
if let Some(size) = config.send_buffer_size {
sock.set_send_buffer_size(size)?;
}

if let Some(size) = me.recv_buffer_size {
sock.set_recv_buffer_size(*size)?;
if let Some(size) = config.recv_buffer_size {
sock.set_recv_buffer_size(size)?;
}

sock.set_nodelay(*me.nodelay)?;
sock.set_nodelay(config.nodelay)?;

let extra = HttpInfo {
remote_addr: sock.peer_addr()?,
Expand Down

0 comments on commit 8b878a8

Please sign in to comment.