Skip to content

Commit

Permalink
optimize dns lookup and add delay when reconnecting
Browse files Browse the repository at this point in the history
do it once instead of doing it for each connection
  • Loading branch information
programatik29 committed Sep 9, 2021
1 parent 17d9acf commit afdef24
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 31 deletions.
2 changes: 1 addition & 1 deletion src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub async fn start_tasks(
bench_type: BenchType,
predicted_size: usize,
) -> Result<Vec<Handle>, AnyError> {
let client = proto::parse::get_client(time_for, uri_string, bench_type, predicted_size)?;
let client = proto::parse::get_client(time_for, uri_string, bench_type, predicted_size).await?;

let mut handles: Vec<Handle> = Vec::with_capacity(connections);

Expand Down
6 changes: 4 additions & 2 deletions src/proto/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

use tokio::net::TcpStream;
use tokio::time::sleep;

use hyper::client::conn;
use hyper::{Body, StatusCode};
Expand Down Expand Up @@ -147,15 +148,16 @@ where
Ok(val) => return Ok(val),
Err(_) => (),
}

sleep(Duration::from_millis(200)).await;
}

Err("connection closed".into())
}

async fn connect(&self, counter: &Arc<AtomicUsize>) -> Result<Connection, AnyError> {
let host_port = format!("{}:{}", self.parsed_uri.host, self.parsed_uri.port);
let stream = TcpStream::connect(&self.parsed_uri.addr).await?;

let stream = TcpStream::connect(&host_port).await?;
let stream = CustomTcpStream::new(stream, counter.clone());

let connection = self.connector.handshake(stream, self.protocol).await?;
Expand Down
7 changes: 3 additions & 4 deletions src/proto/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::proto::{
ParsedUri, Scheme,
};

use std::convert::TryFrom;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -25,7 +24,7 @@ impl ClientBuilder {
}

fn uri_host(&self) -> &str {
&self.parsed_uri.host
&self.parsed_uri.uri.host().unwrap()
}

fn uri_scheme(&self) -> Scheme {
Expand All @@ -47,13 +46,13 @@ impl ClientBuilder {
}
}

pub fn get_client(
pub async fn get_client(
time_for: Duration,
uri_string: String,
bench_type: BenchType,
predicted_size: usize,
) -> Result<Arc<dyn Client>, AnyError> {
let parsed_uri = ParsedUri::try_from(uri_string)?;
let parsed_uri = ParsedUri::parse_and_lookup(&uri_string).await?;

let builder = ClientBuilder::new(time_for, predicted_size, parsed_uri);

Expand Down
44 changes: 20 additions & 24 deletions src/proto/uri.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::error::AnyError;

use std::convert::TryFrom;
use std::net::SocketAddr;
use std::str::FromStr;

use hyper::Uri;
Expand Down Expand Up @@ -41,46 +41,42 @@ impl From<Option<&str>> for Scheme {
pub struct ParsedUri {
pub uri: Uri,
pub scheme: Scheme,
pub host: String,
pub port: u16,
pub addr: SocketAddr,
}

impl TryFrom<&str> for ParsedUri {
type Error = AnyError;

fn try_from(s: &str) -> Result<Self, Self::Error> {
impl ParsedUri {
pub async fn parse_and_lookup(s: &str) -> Result<Self, AnyError> {
let uri = Uri::from_str(&s)?;

let scheme = Scheme::from(uri.scheme_str());

let host = uri.host().ok_or("cant find host")?.to_owned();
let host = uri.host().ok_or("cant find host")?;

let port = match uri.port_u16() {
Some(port) => port,
None => scheme.default_port(),
};

Ok(ParsedUri {
uri,
scheme,
host,
port,
})
let addr = get_preferred_ip(host, port).await?;

Ok(ParsedUri { uri, scheme, addr })
}
}

impl TryFrom<String> for ParsedUri {
type Error = AnyError;
async fn get_preferred_ip(host: &str, port: u16) -> Result<SocketAddr, AnyError> {
let mut addrs = tokio::net::lookup_host((host, port)).await?;

fn try_from(s: String) -> Result<Self, Self::Error> {
Self::try_from(s.as_str())
}
}
let mut res = Err("host lookup failed".into());

impl TryFrom<&String> for ParsedUri {
type Error = AnyError;
while let Some(addr) = addrs.next() {
if addr.is_ipv4() {
return Ok(addr);
}

fn try_from(s: &String) -> Result<Self, Self::Error> {
Self::try_from(s.as_str())
if res.is_err() {
res = Ok(addr);
}
}

res
}

0 comments on commit afdef24

Please sign in to comment.