Skip to content

Commit

Permalink
Add support for binding to local port ranges and retrying on EADDRNOT…
Browse files Browse the repository at this point in the history
…AVAIL
  • Loading branch information
andrewhavck authored and drcaramelsyrup committed Sep 6, 2024
1 parent acffb8a commit d1d7a87
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 38 deletions.
2 changes: 1 addition & 1 deletion .bleep
Original file line number Diff line number Diff line change
@@ -1 +1 @@
28f94f2a402bbf66341bdac8fa670caf5b7311e9
90c70086397a4708a4dadfed6e6915ce6dc33481
206 changes: 195 additions & 11 deletions pingora-core/src/connectors/l4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,59 @@ pub trait Connect: std::fmt::Debug {
async fn connect(&self, addr: &SocketAddr) -> Result<Stream>;
}

/// Settings for binding on connect
#[derive(Clone, Debug, Default)]
pub struct BindTo {
// local ip address
pub addr: Option<InetSocketAddr>,
// port range
port_range: Option<(u16, u16)>,
// whether we fallback and try again on bind errors when a port range is set
fallback: bool,
}

impl BindTo {
/// Sets the port range we will bind to where the first item in the tuple is the lower bound
/// and the second item is the upper bound.
///
/// Note this bind option is only supported on Linux since 6.3, this is a no-op on other systems.
/// To reset the range, pass a `None` or `Some((0,0))`, more information can be found [here](https://man7.org/linux/man-pages/man7/ip.7.html)
pub fn set_port_range(&mut self, range: Option<(u16, u16)>) -> Result<()> {
if range.is_none() && self.port_range.is_none() {
// nothing to do
return Ok(());
}

match range {
// 0,0 is valid for resets
None | Some((0, 0)) => self.port_range = Some((0, 0)),
// set the port range if valid
Some((low, high)) if low > 0 && low < high => {
self.port_range = Some((low, high));
}
_ => return Error::e_explain(SocketError, "invalid port range: {range}"),
}
Ok(())
}

/// Set whether we fallback on no address available if a port range is set
pub fn set_fallback(&mut self, fallback: bool) {
self.fallback = fallback
}

/// Configured bind port range
pub fn port_range(&self) -> Option<(u16, u16)> {
self.port_range
}

/// Whether we attempt to fallback on no address available
pub fn will_fallback(&self) -> bool {
self.fallback && self.port_range.is_some()
}
}

/// Establish a connection (l4) to the given peer using its settings and an optional bind address.
pub(crate) async fn connect<P>(peer: &P, bind_to: Option<InetSocketAddr>) -> Result<Stream>
pub(crate) async fn connect<P>(peer: &P, bind_to: Option<BindTo>) -> Result<Stream>
where
P: Peer + Send + Sync,
{
Expand Down Expand Up @@ -142,12 +193,8 @@ pub(crate) fn bind_to_random<P: Peer>(
peer: &P,
v4_list: &[InetSocketAddr],
v6_list: &[InetSocketAddr],
) -> Option<InetSocketAddr> {
let selected = peer.get_peer_options().and_then(|o| o.bind_to);
if selected.is_some() {
return selected;
}

) -> Option<BindTo> {
// helper function for randomly picking address
fn bind_to_ips(ips: &[InetSocketAddr]) -> Option<InetSocketAddr> {
match ips.len() {
0 => None,
Expand All @@ -159,13 +206,31 @@ pub(crate) fn bind_to_random<P: Peer>(
}
}

match peer.address() {
let mut bind_to = peer.get_peer_options().and_then(|o| o.bind_to.clone());
if bind_to.as_ref().map(|b| b.addr).is_some() {
// already have a bind address selected
return bind_to;
}

let addr = match peer.address() {
SocketAddr::Inet(sockaddr) => match sockaddr {
InetSocketAddr::V4(_) => bind_to_ips(v4_list),
InetSocketAddr::V6(_) => bind_to_ips(v6_list),
},
SocketAddr::Unix(_) => None,
};

if addr.is_some() {
if let Some(bind_to) = bind_to.as_mut() {
bind_to.addr = addr;
} else {
bind_to = Some(BindTo {
addr,
..Default::default()
});
}
}
bind_to
}

use crate::protocols::raw_connect;
Expand Down Expand Up @@ -238,16 +303,25 @@ mod tests {
#[tokio::test]
async fn test_conn_error_addr_not_avail() {
let peer = HttpPeer::new("127.0.0.1:121".to_string(), false, "".to_string());
let new_session = connect(&peer, Some("192.0.2.2:0".parse().unwrap())).await;
let addr = "192.0.2.2:0".parse().ok();
let bind_to = BindTo {
addr,
..Default::default()
};
let new_session = connect(&peer, Some(bind_to)).await;
assert_eq!(new_session.unwrap_err().etype(), &InternalError)
}

#[tokio::test]
async fn test_conn_error_other() {
let peer = HttpPeer::new("240.0.0.1:80".to_string(), false, "".to_string()); // non localhost

let addr = "127.0.0.1:0".parse().ok();
// create an error: cannot send from src addr: localhost to dst addr: a public IP
let new_session = connect(&peer, Some("127.0.0.1:0".parse().unwrap())).await;
let bind_to = BindTo {
addr,
..Default::default()
};
let new_session = connect(&peer, Some(bind_to)).await;
let error = new_session.unwrap_err();
// XXX: some system will allow the socket to bind and connect without error, only to timeout
assert!(error.etype() == &ConnectError || error.etype() == &ConnectTimedout)
Expand Down Expand Up @@ -371,4 +445,114 @@ mod tests {
assert_eq!(err.etype(), &ConnectionClosed);
assert!(!err.retry());
}

#[cfg(target_os = "linux")]
#[tokio::test(flavor = "multi_thread")]
async fn test_bind_to_port_range_on_connect() {
fn get_ip_local_port_range() -> (u16, u16) {
let path = "/proc/sys/net/ipv4/ip_local_port_range";
let file = std::fs::read_to_string(path).unwrap();
let mut parts = file.split_whitespace();
(
parts.next().unwrap().parse().unwrap(),
parts.next().unwrap().parse().unwrap(),
)
}

// one-off mock server
async fn mock_inet_connect_server() {
use tokio::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:10020").await.unwrap();
if let Ok((mut stream, _addr)) = listener.accept().await {
stream.write_all(b"HTTP/1.1 200 OK\r\n\r\n").await.unwrap();
// wait a bit so that the client can read
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}

fn in_port_range(session: Stream, lower: u16, upper: u16) -> bool {
let digest = session.get_socket_digest();
let local_addr = digest
.as_ref()
.and_then(|s| s.local_addr())
.unwrap()
.as_inet()
.unwrap();

// assert range
local_addr.port() >= lower && local_addr.port() <= upper
}

tokio::spawn(async {
mock_inet_connect_server().await;
});
// wait for the server to start
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

// need to read /proc/sys/net/ipv4/ip_local_port_range for this test to work
// IP_LOCAL_PORT_RANGE clamp only works on ports in /proc/sys/net/ipv4/ip_local_port_range
let (low, _) = get_ip_local_port_range();
let high = low + 1;

let peer = HttpPeer::new("127.0.0.1:10020".to_string(), false, "".to_string());
let mut bind_to = BindTo {
addr: "127.0.0.1:0".parse().ok(),
..Default::default()
};
bind_to.set_port_range(Some((low, high))).unwrap();

let session1 = connect(&peer, Some(bind_to.clone())).await.unwrap();
assert!(in_port_range(session1, low, high));

// execute more connect()
let session2 = connect(&peer, Some(bind_to.clone())).await.unwrap();
assert!(in_port_range(session2, low, high));
let session3 = connect(&peer, Some(bind_to.clone())).await.unwrap();
assert!(in_port_range(session3, low, high));

// disabled fallback, should be AddrNotAvailable error
let err = connect(&peer, Some(bind_to.clone())).await.unwrap_err();
assert_eq!(err.etype(), &InternalError);

// enable fallback, assert not in port range but successful
bind_to.set_fallback(true);
let session4 = connect(&peer, Some(bind_to.clone())).await.unwrap();
assert!(!in_port_range(session4, low, high));

// works without bind IP, shift up to use new ports
let low = low + 2;
let high = low + 1;
let mut bind_to = BindTo::default();
bind_to.set_port_range(Some((low, high))).unwrap();
let session5 = connect(&peer, Some(bind_to.clone())).await.unwrap();
assert!(in_port_range(session5, low, high));
}

#[test]
fn test_bind_to_port_ranges() {
let addr = "127.0.0.1:0".parse().ok();
let mut bind_to = BindTo {
addr,
..Default::default()
};

// None because the previous value was None
bind_to.set_port_range(None).unwrap();
assert!(bind_to.port_range.is_none());

// zeroes are handled
bind_to.set_port_range(Some((0, 0))).unwrap();
assert_eq!(bind_to.port_range, Some((0, 0)));

// zeroes because the previous value was Some
bind_to.set_port_range(None).unwrap();
assert_eq!(bind_to.port_range, Some((0, 0)));

// low > high is error
assert!(bind_to.set_port_range(Some((2000, 1000))).is_err());

// low < high success
bind_to.set_port_range(Some((1000, 2000))).unwrap();
assert_eq!(bind_to.port_range, Some((1000, 2000)));
}
}
6 changes: 3 additions & 3 deletions pingora-core/src/connectors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use crate::server::configuration::ServerConf;
use crate::tls::ssl::SslConnector;
use crate::upstreams::peer::{Peer, ALPN};

use l4::connect as l4_connect;
pub use l4::Connect as L4Connect;
use l4::{connect as l4_connect, BindTo};
use log::{debug, error, warn};
use offload::OffloadRuntime;
use parking_lot::RwLock;
Expand Down Expand Up @@ -273,7 +273,7 @@ impl TransportConnector {
// connection timeout if there is one
async fn do_connect<P: Peer + Send + Sync>(
peer: &P,
bind_to: Option<SocketAddr>,
bind_to: Option<BindTo>,
alpn_override: Option<ALPN>,
tls_ctx: &SslConnector,
) -> Result<Stream> {
Expand All @@ -296,7 +296,7 @@ async fn do_connect<P: Peer + Send + Sync>(
// Perform the actual L4 and tls connection steps with no timeout
async fn do_connect_inner<P: Peer + Send + Sync>(
peer: &P,
bind_to: Option<SocketAddr>,
bind_to: Option<BindTo>,
alpn_override: Option<ALPN>,
tls_ctx: &SslConnector,
) -> Result<Stream> {
Expand Down
Loading

0 comments on commit d1d7a87

Please sign in to comment.