Skip to content

Commit

Permalink
feat(transport): Add Endpoint::connect_lazy method (#392)
Browse files Browse the repository at this point in the history
Fixes #167
  • Loading branch information
danburkert authored Jul 10, 2020
1 parent ea7fe66 commit ec9046d
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 17 deletions.
19 changes: 19 additions & 0 deletions tonic/src/transport/channel/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,25 @@ impl Endpoint {
Channel::connect(connector, self.clone()).await
}

/// Create a channel from this config.
///
/// The channel returned by this method does not attempt to connect to the endpoint until first
/// use.
pub fn connect_lazy(&self) -> Result<Channel, Error> {
let mut http = hyper::client::connect::HttpConnector::new();
http.enforce_http(false);
http.set_nodelay(self.tcp_nodelay);
http.set_keepalive(self.tcp_keepalive);

#[cfg(feature = "tls")]
let connector = service::connector(http, self.tls.clone());

#[cfg(not(feature = "tls"))]
let connector = service::connector(http);

Channel::new(connector, self.clone())
}

/// Connect with a custom connector.
pub async fn connect_with_connector<C>(&self, connector: C) -> Result<Channel, Error>
where
Expand Down
20 changes: 17 additions & 3 deletions tonic/src/transport/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,21 @@ impl Channel {
(Self::balance(list, DEFAULT_BUFFER_SIZE), tx)
}

pub(crate) fn new<C>(connector: C, endpoint: Endpoint) -> Result<Self, super::Error>
where
C: Service<Uri> + Send + 'static,
C::Error: Into<crate::Error> + Send,
C::Future: Unpin + Send,
C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
{
let buffer_size = endpoint.buffer_size.clone().unwrap_or(DEFAULT_BUFFER_SIZE);

let svc = Connection::new(connector, endpoint).map_err(super::Error::from_source)?;
let svc = Buffer::new(Either::A(svc), buffer_size);

Ok(Channel { svc })
}

pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, super::Error>
where
C: Service<Uri> + Send + 'static,
Expand All @@ -139,10 +154,9 @@ impl Channel {
{
let buffer_size = endpoint.buffer_size.clone().unwrap_or(DEFAULT_BUFFER_SIZE);

let svc = Connection::new(connector, endpoint)
let svc = Connection::connect(connector, endpoint)
.await
.map_err(|e| super::Error::from_source(e))?;

.map_err(super::Error::from_source)?;
let svc = Buffer::new(Either::A(svc), buffer_size);

Ok(Channel { svc })
Expand Down
19 changes: 14 additions & 5 deletions tonic/src/transport/service/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tower::{
limit::{concurrency::ConcurrencyLimitLayer, rate::RateLimitLayer},
timeout::TimeoutLayer,
util::BoxService,
ServiceBuilder,
ServiceBuilder, ServiceExt,
};
use tower_load::Load;
use tower_service::Service;
Expand All @@ -29,7 +29,7 @@ pub(crate) struct Connection {
}

impl Connection {
pub(crate) async fn new<C>(connector: C, endpoint: Endpoint) -> Result<Self, crate::Error>
pub(crate) fn new<C>(connector: C, endpoint: Endpoint) -> Result<Self, crate::Error>
where
C: Service<Uri> + Send + 'static,
C::Error: Into<crate::Error> + Send,
Expand Down Expand Up @@ -60,16 +60,25 @@ impl Connection {
.optional_layer(endpoint.rate_limit.map(|(l, d)| RateLimitLayer::new(l, d)))
.into_inner();

let mut connector = HyperConnect::new(connector, settings);
let initial_conn = connector.call(endpoint.uri.clone()).await?;
let conn = Reconnect::new(initial_conn, connector, endpoint.uri.clone());
let connector = HyperConnect::new(connector, settings);
let conn = Reconnect::new(connector, endpoint.uri.clone());

let inner = stack.layer(conn);

Ok(Self {
inner: BoxService::new(inner),
})
}

pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, crate::Error>
where
C: Service<Uri> + Send + 'static,
C::Error: Into<crate::Error> + Send,
C::Future: Unpin + Send,
C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
{
Self::new(connector, endpoint)?.ready_oneshot().await
}
}

impl Service<Request> for Connection {
Expand Down
2 changes: 1 addition & 1 deletion tonic/src/transport/service/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl<K: Hash + Eq + Clone> Discover for DynamicServiceStream<K> {

#[cfg(not(feature = "tls"))]
let connector = service::connector(http);
let fut = Connection::new(connector, endpoint);
let fut = Connection::connect(connector, endpoint);
self.connecting = Some((k, Box::pin(fut)));
continue;
}
Expand Down
10 changes: 2 additions & 8 deletions tonic/src/transport/service/reconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,10 @@ impl<M, Target> Reconnect<M, Target>
where
M: Service<Target>,
{
pub(crate) fn new<S, Request>(initial_connection: S, mk_service: M, target: Target) -> Self
where
M: Service<Target, Response = S>,
S: Service<Request>,
Error: From<M::Error> + From<S::Error>,
Target: Clone,
{
pub(crate) fn new(mk_service: M, target: Target) -> Self {
Reconnect {
mk_service,
state: State::Connected(initial_connection),
state: State::Idle,
target,
error: None,
}
Expand Down

0 comments on commit ec9046d

Please sign in to comment.