Skip to content

Commit

Permalink
remove ip addr
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasad1 committed Aug 9, 2024
1 parent eac1b1b commit 69e30c6
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 78 deletions.
55 changes: 3 additions & 52 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

use std::error::Error as StdError;
use std::future::Future;
use std::net::{IpAddr, SocketAddr, TcpListener as StdTcpListener};
use std::net::{SocketAddr, TcpListener as StdTcpListener};
use std::pin::Pin;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
Expand Down Expand Up @@ -240,8 +240,6 @@ pub struct TowerServiceBuilder<RpcMiddleware, HttpMiddleware> {
pub(crate) conn_id: Arc<AtomicU32>,
/// Connection guard.
pub(crate) conn_guard: ConnectionGuard,
/// IP address.
pub(crate) ip_addr: Option<IpAddr>,
}

/// Configuration for batch request handling.
Expand Down Expand Up @@ -463,7 +461,6 @@ pub struct Builder<HttpMiddleware, RpcMiddleware> {
server_cfg: ServerConfig,
rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
http_middleware: tower::ServiceBuilder<HttpMiddleware>,
ip_addr: Option<IpAddr>,
}

impl Default for Builder<Identity, Identity> {
Expand All @@ -472,7 +469,6 @@ impl Default for Builder<Identity, Identity> {
server_cfg: ServerConfig::default(),
rpc_middleware: RpcServiceBuilder::new(),
http_middleware: tower::ServiceBuilder::new(),
ip_addr: None,
}
}
}
Expand All @@ -482,12 +478,6 @@ impl Builder<Identity, Identity> {
pub fn new() -> Self {
Self::default()
}

/// Set the address of the remote peer.
pub fn set_ip_addr(mut self, ip_addr: IpAddr) -> Self {
self.ip_addr = Some(ip_addr);
self
}
}

impl<RpcMiddleware, HttpMiddleware> TowerServiceBuilder<RpcMiddleware, HttpMiddleware> {
Expand All @@ -507,7 +497,6 @@ impl<RpcMiddleware, HttpMiddleware> TowerServiceBuilder<RpcMiddleware, HttpMiddl
conn_id,
conn_guard: self.conn_guard,
server_cfg: self.server_cfg,
ip_addr: self.ip_addr,
},
on_session_close: None,
};
Expand Down Expand Up @@ -537,7 +526,6 @@ impl<RpcMiddleware, HttpMiddleware> TowerServiceBuilder<RpcMiddleware, HttpMiddl
http_middleware: self.http_middleware,
conn_id: self.conn_id,
conn_guard: self.conn_guard,
ip_addr: self.ip_addr,
}
}

Expand All @@ -552,19 +540,6 @@ impl<RpcMiddleware, HttpMiddleware> TowerServiceBuilder<RpcMiddleware, HttpMiddl
http_middleware,
conn_id: self.conn_id,
conn_guard: self.conn_guard,
ip_addr: self.ip_addr,
}
}

/// Set the address of the remote peer.
pub fn set_ip_addr(self, ip_addr: IpAddr) -> TowerServiceBuilder<RpcMiddleware, HttpMiddleware> {
TowerServiceBuilder {
server_cfg: self.server_cfg,
rpc_middleware: self.rpc_middleware,
http_middleware: self.http_middleware,
conn_id: self.conn_id,
conn_guard: self.conn_guard,
ip_addr: Some(ip_addr),
}
}
}
Expand Down Expand Up @@ -662,12 +637,7 @@ impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
/// let builder = ServerBuilder::default().set_rpc_middleware(m);
/// ```
pub fn set_rpc_middleware<T>(self, rpc_middleware: RpcServiceBuilder<T>) -> Builder<HttpMiddleware, T> {
Builder {
server_cfg: self.server_cfg,
rpc_middleware,
http_middleware: self.http_middleware,
ip_addr: self.ip_addr,
}
Builder { server_cfg: self.server_cfg, rpc_middleware, http_middleware: self.http_middleware }
}

/// Configure a custom [`tokio::runtime::Handle`] to run the server on.
Expand Down Expand Up @@ -753,12 +723,7 @@ impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
/// }
/// ```
pub fn set_http_middleware<T>(self, http_middleware: tower::ServiceBuilder<T>) -> Builder<T, RpcMiddleware> {
Builder {
server_cfg: self.server_cfg,
http_middleware,
rpc_middleware: self.rpc_middleware,
ip_addr: self.ip_addr,
}
Builder { server_cfg: self.server_cfg, http_middleware, rpc_middleware: self.rpc_middleware }
}

/// Configure `TCP_NODELAY` on the socket to the supplied value `nodelay`.
Expand Down Expand Up @@ -894,7 +859,6 @@ impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
http_middleware: self.http_middleware,
conn_id: Arc::new(AtomicU32::new(0)),
conn_guard: ConnectionGuard::new(max_conns),
ip_addr: self.ip_addr,
}
}

Expand Down Expand Up @@ -976,8 +940,6 @@ struct ServiceData {
conn_guard: ConnectionGuard,
/// ServerConfig
server_cfg: ServerConfig,
/// IP address.
ip_addr: Option<IpAddr>,
}

/// jsonrpsee tower service
Expand Down Expand Up @@ -1087,15 +1049,6 @@ where

request.extensions_mut().insert::<ConnectionId>(conn.conn_id.into());

if let Some(ip_addr) = self.inner.ip_addr {
// Only insert the remote address if it's not already set.
// We expect servers deployed behind a reverse proxy to set the remote address
// themselves otherwise the remote address will be the address of the reverse proxy.
if request.extensions().get::<IpAddr>().is_none() {
request.extensions_mut().insert(ip_addr);
}
}

let is_upgrade_request = is_upgrade_request(&request);

if self.inner.server_cfg.enable_ws && is_upgrade_request {
Expand Down Expand Up @@ -1237,7 +1190,6 @@ where
stop_handle,
drop_on_completion,
methods,
remote_addr,
..
} = params;

Expand All @@ -1253,7 +1205,6 @@ where
stop_handle: stop_handle.clone(),
conn_id,
conn_guard: conn_guard.clone(),
ip_addr: Some(remote_addr.ip()),
},
rpc_middleware,
on_session_close: None,
Expand Down
36 changes: 10 additions & 26 deletions server/src/transport/ws.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::VecDeque;
use std::net::IpAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -78,17 +77,15 @@ where
extensions,
} = params;

// NOTE: jsonrpsee only inject the `remote_addr` if it not set because for servers that are behind a reverse proxy,
// needs read HTTP headers to get the real IP address of the client.
let ip_addr = extensions.get::<IpAddr>().copied();
let conn_id = conn.conn_id;
let ServerConfig { ping_config, batch_requests_config, max_request_body_size, max_response_body_size, .. } =
server_cfg;
let (conn_tx, conn_rx) = oneshot::channel();

// Spawn ping/pong task if ping config is provided.
let ping_config = if let Some(ping_config) = ping_config {
let (ping_tx, ping_rx) = mpsc::channel::<KeepAlive>(4);
tokio::spawn(ping_pong_task(ping_rx, ping_config.inactive_limit, ping_config.max_failures, ip_addr));
tokio::spawn(ping_pong_task(ping_rx, ping_config.inactive_limit, ping_config.max_failures, conn_id));
Some((ping_config, ping_tx))
} else {
None
Expand All @@ -97,7 +94,7 @@ where
let ping_tx = ping_config.as_ref().map(|(_, tx)| tx.clone());

// Spawn another task that sends out the responses on the Websocket.
let send_task_handle = tokio::spawn(send_task(rx, ws_sender, ping_config, conn_rx, ip_addr));
let send_task_handle = tokio::spawn(send_task(rx, ws_sender, ping_config, conn_rx));

let stopped = conn.stop_handle.clone().shutdown();
let rpc_service = Arc::new(rpc_service);
Expand Down Expand Up @@ -200,7 +197,7 @@ where
});
};

tracing::debug!(target: LOG_TARGET, "Connection closed for peer={:?}, reason={:?}", ip_addr, result);
tracing::debug!(target: LOG_TARGET, "Connection closed for conn_id={conn_id}, reason={:?}", result);

// Drive all running methods to completion.
// **NOTE** Do not return early in this function. This `await` needs to run to guarantee
Expand All @@ -221,7 +218,6 @@ async fn send_task(
mut ws_sender: Sender,
ping_config: Option<(PingConfig, mpsc::Sender<KeepAlive>)>,
stop: oneshot::Receiver<()>,
remote_addr: Option<IpAddr>,
) {
use futures_util::future::Either;

Expand All @@ -248,17 +244,11 @@ async fn send_task(
// Received message.
Either::Left((Some(response), not_ready)) => {
// If websocket message send fail then terminate the connection.
let now = Instant::now();
if let Err(err) = send_message(&mut ws_sender, response).await {
tracing::debug!(target: LOG_TARGET, "WS send error: {}", err);
break;
}

if now.elapsed() > Duration::from_secs(30) {
tracing::warn!(target: LOG_TARGET, "Send message was slow {}s, peer={:?}", now.elapsed().as_secs(), remote_addr);
break;
}

rx_item = rx.next();
futs = not_ready;
}
Expand All @@ -271,17 +261,11 @@ async fn send_task(
// Handle timer intervals.
Either::Right((Either::Left((Some(ping_tx), _stopped)), next_rx)) => {
stop = _stopped;
let now = Instant::now();
if let Err(err) = send_ping(&mut ws_sender).await {
tracing::debug!(target: LOG_TARGET, "WS send ping error: {}", err);
break;
}

if now.elapsed() > Duration::from_secs(30) {
tracing::warn!(target: LOG_TARGET, "Send ping was slow {}s, peer={:?}", now.elapsed().as_secs(), remote_addr);
break;
}

rx_item = next_rx;

let ping_tx = ping_tx.expect("ping tx is only `None` if ping_config is `None` checked above; qed");
Expand Down Expand Up @@ -380,7 +364,7 @@ async fn ping_pong_task(
mut rx: mpsc::Receiver<KeepAlive>,
max_inactive_limit: Duration,
max_missed_pings: usize,
remote_addr: Option<IpAddr>,
conn_id: u32,
) {
let mut polling_interval = IntervalStream::new(interval(max_inactive_limit));
let mut pending_pings: VecDeque<Instant> = VecDeque::new();
Expand All @@ -398,11 +382,11 @@ async fn ping_pong_task(
if elapsed >= max_inactive_limit {
missed_pings += 1;
remove = true;
tracing::debug!(target: LOG_TARGET, "Ping/pong keep alive expired for peer={:?}, elapsed={}ms/max={}ms", remote_addr, elapsed.as_millis(), max_inactive_limit.as_millis());
tracing::debug!(target: LOG_TARGET, "Ping/pong keep alive expired for conn_id={conn_id}, elapsed={}ms/max={}ms", elapsed.as_millis(), max_inactive_limit.as_millis());
}

if missed_pings >= max_missed_pings {
tracing::debug!(target: LOG_TARGET, "Missed {missed_pings} ping/pongs for peer={:?}; closing connection", remote_addr);
tracing::debug!(target: LOG_TARGET, "Missed {missed_pings} ping/pongs for conn_id={conn_id}; closing connection");
break;
}
}
Expand All @@ -427,13 +411,13 @@ async fn ping_pong_task(

if elapsed >= max_inactive_limit {
missed_pings += 1;
tracing::debug!(target: LOG_TARGET, "Ping/pong keep alive expired for peer={:?}, elapsed={}ms/max={}ms", remote_addr, elapsed.as_millis(), max_inactive_limit.as_millis());
tracing::debug!(target: LOG_TARGET, "Ping/pong keep alive expired for conn_id={conn_id}, elapsed={}ms/max={}ms", elapsed.as_millis(), max_inactive_limit.as_millis());
}

tracing::trace!(target: LOG_TARGET, "ws_ping_pong_rtt={}ms, peer={:?}", elapsed.as_millis(), remote_addr);
tracing::trace!(target: LOG_TARGET, "ws_ping_pong_rtt={}ms, conn_id={conn_id}", elapsed.as_millis());

if missed_pings >= max_missed_pings {
tracing::debug!(target: LOG_TARGET, "Missed {missed_pings} ping/pongs for peer={:?}; closing connection", remote_addr);
tracing::debug!(target: LOG_TARGET, "Missed {missed_pings} ping/pongs for conn_id={conn_id}; closing connection");
break;
}
}
Expand Down

0 comments on commit 69e30c6

Please sign in to comment.