Skip to content

Commit 4179297

Browse files
sfacklerseanmonstar
authored andcommitted
feat(client): Add connect timeout to HttpConnector (#1972)
This takes the same strategy as golang, where the timeout value is divided equally between the candidate socket addresses. If happy eyeballs is enabled, the division takes place "below" the IPv4/IPv6 partitioning.
1 parent 536b1e1 commit 4179297

File tree

2 files changed

+53
-16
lines changed

2 files changed

+53
-16
lines changed

src/client/connect/dns.rs

+4
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,10 @@ impl IpAddrs {
235235
pub(super) fn is_empty(&self) -> bool {
236236
self.iter.as_slice().is_empty()
237237
}
238+
239+
pub(super) fn len(&self) -> usize {
240+
self.iter.as_slice().len()
241+
}
238242
}
239243

240244
impl Iterator for IpAddrs {

src/client/connect/http.rs

+49-16
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use futures_util::{TryFutureExt, FutureExt};
1010
use net2::TcpBuilder;
1111
use tokio_net::driver::Handle;
1212
use tokio_net::tcp::TcpStream;
13-
use tokio_timer::Delay;
13+
use tokio_timer::{Delay, Timeout};
1414

1515
use crate::common::{Future, Pin, Poll, task};
1616
use super::{Connect, Connected, Destination};
@@ -32,6 +32,7 @@ type ConnectFuture = Pin<Box<dyn Future<Output = io::Result<TcpStream>> + Send>>
3232
pub struct HttpConnector<R = GaiResolver> {
3333
enforce_http: bool,
3434
handle: Option<Handle>,
35+
connect_timeout: Option<Duration>,
3536
happy_eyeballs_timeout: Option<Duration>,
3637
keep_alive_timeout: Option<Duration>,
3738
local_address: Option<IpAddr>,
@@ -101,6 +102,7 @@ impl<R> HttpConnector<R> {
101102
HttpConnector {
102103
enforce_http: true,
103104
handle: None,
105+
connect_timeout: None,
104106
happy_eyeballs_timeout: Some(Duration::from_millis(300)),
105107
keep_alive_timeout: None,
106108
local_address: None,
@@ -168,6 +170,17 @@ impl<R> HttpConnector<R> {
168170
self.local_address = addr;
169171
}
170172

173+
/// Set the connect timeout.
174+
///
175+
/// If a domain resolves to multiple IP addresses, the timeout will be
176+
/// evenly divided across them.
177+
///
178+
/// Default is `None`.
179+
#[inline]
180+
pub fn set_connect_timeout(&mut self, dur: Option<Duration>) {
181+
self.connect_timeout = dur;
182+
}
183+
171184
/// Set timeout for [RFC 6555 (Happy Eyeballs)][RFC 6555] algorithm.
172185
///
173186
/// If hostname resolves to both IPv4 and IPv6 addresses and connection
@@ -240,6 +253,7 @@ where
240253
HttpConnecting {
241254
state: State::Lazy(self.resolver.clone(), host.into(), self.local_address),
242255
handle: self.handle.clone(),
256+
connect_timeout: self.connect_timeout,
243257
happy_eyeballs_timeout: self.happy_eyeballs_timeout,
244258
keep_alive_timeout: self.keep_alive_timeout,
245259
nodelay: self.nodelay,
@@ -295,6 +309,7 @@ where
295309
let fut = HttpConnecting {
296310
state: State::Lazy(self.resolver.clone(), host.into(), self.local_address),
297311
handle: self.handle.clone(),
312+
connect_timeout: self.connect_timeout,
298313
happy_eyeballs_timeout: self.happy_eyeballs_timeout,
299314
keep_alive_timeout: self.keep_alive_timeout,
300315
nodelay: self.nodelay,
@@ -323,6 +338,7 @@ fn invalid_url<R: Resolve>(err: InvalidUrl, handle: &Option<Handle>) -> HttpConn
323338
keep_alive_timeout: None,
324339
nodelay: false,
325340
port: 0,
341+
connect_timeout: None,
326342
happy_eyeballs_timeout: None,
327343
reuse_address: false,
328344
send_buffer_size: None,
@@ -357,6 +373,7 @@ impl StdError for InvalidUrl {
357373
pub struct HttpConnecting<R: Resolve = GaiResolver> {
358374
state: State<R>,
359375
handle: Option<Handle>,
376+
connect_timeout: Option<Duration>,
360377
happy_eyeballs_timeout: Option<Duration>,
361378
keep_alive_timeout: Option<Duration>,
362379
nodelay: bool,
@@ -389,7 +406,7 @@ where
389406
// skip resolving the dns and start connecting right away.
390407
if let Some(addrs) = dns::IpAddrs::try_parse(host, me.port) {
391408
state = State::Connecting(ConnectingTcp::new(
392-
local_addr, addrs, me.happy_eyeballs_timeout, me.reuse_address));
409+
local_addr, addrs, me.connect_timeout, me.happy_eyeballs_timeout, me.reuse_address));
393410
} else {
394411
let name = dns::Name::new(mem::replace(host, String::new()));
395412
state = State::Resolving(resolver.resolve(name), local_addr);
@@ -403,7 +420,7 @@ where
403420
.collect();
404421
let addrs = dns::IpAddrs::new(addrs);
405422
state = State::Connecting(ConnectingTcp::new(
406-
local_addr, addrs, me.happy_eyeballs_timeout, me.reuse_address));
423+
local_addr, addrs, me.connect_timeout, me.happy_eyeballs_timeout, me.reuse_address));
407424
},
408425
State::Connecting(ref mut c) => {
409426
let sock = ready!(c.poll(cx, &me.handle))?;
@@ -454,6 +471,7 @@ impl ConnectingTcp {
454471
fn new(
455472
local_addr: Option<IpAddr>,
456473
remote_addrs: dns::IpAddrs,
474+
connect_timeout: Option<Duration>,
457475
fallback_timeout: Option<Duration>,
458476
reuse_address: bool,
459477
) -> ConnectingTcp {
@@ -462,25 +480,25 @@ impl ConnectingTcp {
462480
if fallback_addrs.is_empty() {
463481
return ConnectingTcp {
464482
local_addr,
465-
preferred: ConnectingTcpRemote::new(preferred_addrs),
483+
preferred: ConnectingTcpRemote::new(preferred_addrs, connect_timeout),
466484
fallback: None,
467485
reuse_address,
468486
};
469487
}
470488

471489
ConnectingTcp {
472490
local_addr,
473-
preferred: ConnectingTcpRemote::new(preferred_addrs),
491+
preferred: ConnectingTcpRemote::new(preferred_addrs, connect_timeout),
474492
fallback: Some(ConnectingTcpFallback {
475493
delay: tokio_timer::delay_for(fallback_timeout),
476-
remote: ConnectingTcpRemote::new(fallback_addrs),
494+
remote: ConnectingTcpRemote::new(fallback_addrs, connect_timeout),
477495
}),
478496
reuse_address,
479497
}
480498
} else {
481499
ConnectingTcp {
482500
local_addr,
483-
preferred: ConnectingTcpRemote::new(remote_addrs),
501+
preferred: ConnectingTcpRemote::new(remote_addrs, connect_timeout),
484502
fallback: None,
485503
reuse_address,
486504
}
@@ -495,13 +513,17 @@ struct ConnectingTcpFallback {
495513

496514
struct ConnectingTcpRemote {
497515
addrs: dns::IpAddrs,
516+
connect_timeout: Option<Duration>,
498517
current: Option<ConnectFuture>,
499518
}
500519

501520
impl ConnectingTcpRemote {
502-
fn new(addrs: dns::IpAddrs) -> Self {
521+
fn new(addrs: dns::IpAddrs, connect_timeout: Option<Duration>) -> Self {
522+
let connect_timeout = connect_timeout.map(|t| t / (addrs.len() as u32));
523+
503524
Self {
504525
addrs,
526+
connect_timeout,
505527
current: None,
506528
}
507529
}
@@ -530,14 +552,14 @@ impl ConnectingTcpRemote {
530552
err = Some(e);
531553
if let Some(addr) = self.addrs.next() {
532554
debug!("connecting to {}", addr);
533-
*current = connect(&addr, local_addr, handle, reuse_address)?;
555+
*current = connect(&addr, local_addr, handle, reuse_address, self.connect_timeout)?;
534556
continue;
535557
}
536558
}
537559
}
538560
} else if let Some(addr) = self.addrs.next() {
539561
debug!("connecting to {}", addr);
540-
self.current = Some(connect(&addr, local_addr, handle, reuse_address)?);
562+
self.current = Some(connect(&addr, local_addr, handle, reuse_address, self.connect_timeout)?);
541563
continue;
542564
}
543565

@@ -546,7 +568,13 @@ impl ConnectingTcpRemote {
546568
}
547569
}
548570

549-
fn connect(addr: &SocketAddr, local_addr: &Option<IpAddr>, handle: &Option<Handle>, reuse_address: bool) -> io::Result<ConnectFuture> {
571+
fn connect(
572+
addr: &SocketAddr,
573+
local_addr: &Option<IpAddr>,
574+
handle: &Option<Handle>,
575+
reuse_address: bool,
576+
connect_timeout: Option<Duration>,
577+
) -> io::Result<ConnectFuture> {
550578
let builder = match addr {
551579
&SocketAddr::V4(_) => TcpBuilder::new_v4()?,
552580
&SocketAddr::V6(_) => TcpBuilder::new_v6()?,
@@ -581,10 +609,16 @@ fn connect(addr: &SocketAddr, local_addr: &Option<IpAddr>, handle: &Option<Handl
581609
let std_tcp = builder.to_tcp_stream()?;
582610

583611
Ok(Box::pin(async move {
584-
TcpStream::connect_std(std_tcp, &addr, &handle).await
612+
let connect = TcpStream::connect_std(std_tcp, &addr, &handle);
613+
match connect_timeout {
614+
Some(timeout) => match Timeout::new(connect, timeout).await {
615+
Ok(Ok(s)) => Ok(s),
616+
Ok(Err(e)) => Err(e),
617+
Err(e) => Err(io::Error::new(io::ErrorKind::TimedOut, e)),
618+
}
619+
None => connect.await,
620+
}
585621
}))
586-
587-
//Ok(Box::pin(TcpStream::connect_std(std_tcp, addr, &handle)))
588622
}
589623

590624
impl ConnectingTcp {
@@ -673,7 +707,6 @@ mod tests {
673707
})
674708
}
675709

676-
677710
#[test]
678711
fn test_errors_missing_scheme() {
679712
let mut rt = Runtime::new().unwrap();
@@ -765,7 +798,7 @@ mod tests {
765798
}
766799

767800
let addrs = hosts.iter().map(|host| (host.clone(), addr.port()).into()).collect();
768-
let connecting_tcp = ConnectingTcp::new(None, dns::IpAddrs::new(addrs), Some(fallback_timeout), false);
801+
let connecting_tcp = ConnectingTcp::new(None, dns::IpAddrs::new(addrs), None, Some(fallback_timeout), false);
769802
let fut = ConnectingTcpFuture(connecting_tcp);
770803

771804
let start = Instant::now();

0 commit comments

Comments
 (0)