Skip to content

Commit 276476e

Browse files
committed
feat(gateway): support multi-port TCP listening via port ranges
Allow listen_port to accept either a single port (8443) or a range string ("8443-8543") to bind multiple TCP listeners, avoiding ephemeral port exhaustion under high connection counts. Also removes the single-port field from HostInfo proto since listen_port is now a range, and uses {e:#} for cleaner error logs.
1 parent dd3ed5f commit 276476e

File tree

4 files changed

+66
-20
lines changed

4 files changed

+66
-20
lines changed

gateway/rpc/proto/gateway_rpc.proto

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,6 @@ message HostInfo {
7777
string app_id = 3;
7878
// The base domain of the HTTPS endpoint of the host.
7979
string base_domain = 4;
80-
// The external port of the host.
81-
uint32 port = 5;
8280
// The latest handshake time of the host.
8381
uint64 latest_handshake = 6;
8482
// The number of connections of the host.

gateway/src/admin_service.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ impl AdminRpcHandler {
3535
ip: instance.ip.to_string(),
3636
app_id: instance.app_id.clone(),
3737
base_domain: base_domain.clone(),
38-
port: state.config.proxy.listen_port as u32,
3938
latest_handshake: encode_ts(instance.last_seen),
4039
num_connections: instance.num_connections(),
4140
})
@@ -97,7 +96,6 @@ impl AdminRpc for AdminRpcHandler {
9796
ip: instance.ip.to_string(),
9897
app_id: instance.app_id.clone(),
9998
base_domain: base_domain.clone(),
100-
port: state.config.proxy.listen_port as u32,
10199
latest_handshake: {
102100
let (ts, _) = handshakes
103101
.get(&instance.public_key)

gateway/src/config.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,40 @@ pub enum TlsVersion {
6767
Tls13,
6868
}
6969

70+
/// Deserialize a port range from either a single integer (443) or a string range ("443-543").
71+
fn deserialize_port_range<'de, D>(deserializer: D) -> std::result::Result<Vec<u16>, D::Error>
72+
where
73+
D: serde::Deserializer<'de>,
74+
{
75+
use serde::de;
76+
77+
#[derive(Deserialize)]
78+
#[serde(untagged)]
79+
enum PortSpec {
80+
Single(u16),
81+
Range(String),
82+
}
83+
84+
match PortSpec::deserialize(deserializer)? {
85+
PortSpec::Single(p) => Ok(vec![p]),
86+
PortSpec::Range(s) => {
87+
if let Some((start, end)) = s.split_once('-') {
88+
let start: u16 = start.trim().parse().map_err(de::Error::custom)?;
89+
let end: u16 = end.trim().parse().map_err(de::Error::custom)?;
90+
if start > end {
91+
return Err(de::Error::custom(format!(
92+
"invalid port range: {start} > {end}"
93+
)));
94+
}
95+
Ok((start..=end).collect())
96+
} else {
97+
let p: u16 = s.trim().parse().map_err(de::Error::custom)?;
98+
Ok(vec![p])
99+
}
100+
}
101+
}
102+
}
103+
70104
#[derive(Debug, Clone, Deserialize)]
71105
pub struct ProxyConfig {
72106
pub cert_chain: String,
@@ -76,7 +110,8 @@ pub struct ProxyConfig {
76110
pub base_domain: String,
77111
pub external_port: u16,
78112
pub listen_addr: Ipv4Addr,
79-
pub listen_port: u16,
113+
#[serde(deserialize_with = "deserialize_port_range")]
114+
pub listen_port: Vec<u16>,
80115
pub agent_port: u16,
81116
pub timeouts: Timeouts,
82117
pub buffer_size: usize,

gateway/src/proxy.rs

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::{
88
atomic::{AtomicU64, AtomicUsize, Ordering},
99
Arc,
1010
},
11+
task::Poll,
1112
};
1213

1314
use anyhow::{bail, Context, Result};
@@ -173,21 +174,35 @@ pub async fn proxy_main(config: &ProxyConfig, proxy: Proxy) -> Result<()> {
173174
let base_domain = base_domain.strip_prefix(".").unwrap_or(base_domain);
174175
Arc::new(format!(".{base_domain}"))
175176
};
176-
let listener = TcpListener::bind((config.listen_addr, config.listen_port))
177-
.await
178-
.with_context(|| {
179-
format!(
180-
"failed to bind {}:{}",
181-
config.listen_addr, config.listen_port
182-
)
183-
})?;
184-
info!(
185-
"tcp bridge listening on {}:{}",
186-
config.listen_addr, config.listen_port
187-
);
177+
let mut tcp_listeners = Vec::new();
178+
for &port in &config.listen_port {
179+
let listener = TcpListener::bind((config.listen_addr, port))
180+
.await
181+
.with_context(|| format!("failed to bind {}:{}", config.listen_addr, port))?;
182+
info!("tcp bridge listening on {}:{}", config.listen_addr, port);
183+
tcp_listeners.push(listener);
184+
}
185+
if tcp_listeners.is_empty() {
186+
bail!("no tcp listen ports configured");
187+
}
188188

189+
let poll_counter = AtomicUsize::new(0);
189190
loop {
190-
match listener.accept().await {
191+
// Accept from any TCP listener via round-robin poll.
192+
let poll_start = poll_counter.fetch_add(1, Ordering::Relaxed);
193+
let n = tcp_listeners.len();
194+
let accepted: std::io::Result<(TcpStream, std::net::SocketAddr)> =
195+
std::future::poll_fn(|cx| {
196+
for j in 0..n {
197+
let i = (poll_start + j) % n;
198+
if let Poll::Ready(result) = tcp_listeners[i].poll_accept(cx) {
199+
return Poll::Ready(result);
200+
}
201+
}
202+
Poll::Pending
203+
})
204+
.await;
205+
match accepted {
191206
Ok((inbound, from)) => {
192207
let span = info_span!("conn", id = next_connection_id());
193208
let _enter = span.enter();
@@ -210,7 +225,7 @@ pub async fn proxy_main(config: &ProxyConfig, proxy: Proxy) -> Result<()> {
210225
info!("connection closed");
211226
}
212227
Ok(Err(e)) => {
213-
error!("connection error: {e:?}");
228+
error!("connection error: {e:#}");
214229
}
215230
Err(_) => {
216231
error!("connection kept too long, force closing");
@@ -245,7 +260,7 @@ pub fn start(config: ProxyConfig, app_state: Proxy) -> Result<()> {
245260
// Run the proxy_main function in this runtime
246261
if let Err(err) = rt.block_on(proxy_main(&config, app_state)) {
247262
error!(
248-
"error on {}:{}: {err:?}",
263+
"error on {}:{:?}: {err:?}",
249264
config.listen_addr, config.listen_port
250265
);
251266
}

0 commit comments

Comments
 (0)