Skip to content

Commit 82b599e

Browse files
committed
support setting recv/send timeout for tcp and udp connections
1 parent ece6484 commit 82b599e

File tree

9 files changed

+69
-28
lines changed

9 files changed

+69
-28
lines changed

Cargo.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "omnip"
3-
version = "0.6.3"
3+
version = "0.6.4"
44
edition = "2021"
55

66
[lib]
@@ -28,7 +28,7 @@ lazy_static = "1.4"
2828
async-trait = "0.1"
2929
byte-pool = { git = "https://github.com/neevek/byte-pool" }
3030
# rstun = { path = "../rstun" }
31-
rstun = { git = "https://github.com/neevek/rstun", tag = "release/0.6.4" }
31+
rstun = { git = "https://github.com/neevek/rstun", tag = "release/0.6.5" }
3232
hyper = { version = "0.14", features = ["full"]}
3333
http = "0.2"
3434
http-body = "0.4"

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,13 @@ Options:
122122
-e, --cipher <CIPHER>
123123
Applicable only for +quic protocols
124124
Cipher for encryption [default: chacha20-poly1305] [possible values: chacha20-poly1305, aes-256-gcm, aes-128-gcm]
125-
-i, --max-idle-timeout-ms <MAX_IDLE_TIMEOUT_MS>
125+
-i, --quic-timeout-ms <QUIC_TIMEOUT_MS>
126126
Applicable only for quic protocol as upstream
127127
Max idle timeout for the QUIC connections [default: 120000]
128+
--tcp-timeout-ms <TCP_TIMEOUT_MS>
129+
Read timeout in milliseconds for TCP connections [default: 30000]
130+
--udp-timeout-ms <UDP_TIMEOUT_MS>
131+
Read timeout in milliseconds for UDP connections [default: 5000]
128132
-R, --retry-interval-ms <RETRY_INTERVAL_MS>
129133
Applicable only for quic protocol as upstream
130134
Max idle timeout for the QUIC connections [default: 5000]

src/bin/omnip.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,18 @@ fn main() -> Result<()> {
3131
args.threads,
3232
args.watch_proxy_rules_change,
3333
args.tcp_nodelay,
34+
args.tcp_timeout_ms,
35+
args.udp_timeout_ms,
3436
)?;
3537

3638
let common_quic_config = CommonQuicConfig {
3739
cert: args.cert,
3840
key: args.key,
3941
password: args.password,
4042
cipher: args.cipher,
41-
max_idle_timeout_ms: args.max_idle_timeout_ms,
43+
quic_timeout_ms: args.quic_timeout_ms,
44+
tcp_timeout_ms: args.tcp_timeout_ms,
45+
udp_timeout_ms: args.udp_timeout_ms,
4246
retry_interval_ms: args.retry_interval_ms,
4347
workers: args.threads,
4448
};
@@ -156,7 +160,15 @@ struct OmnipArgs {
156160
/// Applicable only for quic protocol as upstream
157161
/// Max idle timeout for the QUIC connections
158162
#[arg(short = 'i', long, verbatim_doc_comment, default_value = "120000")]
159-
max_idle_timeout_ms: u64,
163+
quic_timeout_ms: u64,
164+
165+
/// Read timeout in milliseconds for TCP connections
166+
#[arg(long, verbatim_doc_comment, default_value = "30000")]
167+
tcp_timeout_ms: u64,
168+
169+
/// Read timeout in milliseconds for UDP connections
170+
#[arg(long, verbatim_doc_comment, default_value = "5000")]
171+
udp_timeout_ms: u64,
160172

161173
/// Applicable only for quic protocol as upstream
162174
/// Max idle timeout for the QUIC connections

src/lib.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,8 @@ pub struct Config {
319319
pub name_servers: String,
320320
pub watch_proxy_rules_change: bool,
321321
pub tcp_nodelay: bool,
322+
pub tcp_timeout_ms: u64,
323+
pub udp_timeout_ms: u64,
322324
}
323325

324326
#[derive(Debug)]
@@ -353,7 +355,9 @@ pub struct CommonQuicConfig {
353355
pub key: String,
354356
pub cipher: String,
355357
pub password: String,
356-
pub max_idle_timeout_ms: u64,
358+
pub quic_timeout_ms: u64,
359+
pub tcp_timeout_ms: u64,
360+
pub udp_timeout_ms: u64,
357361
pub retry_interval_ms: u64,
358362
pub workers: usize,
359363
}
@@ -494,6 +498,8 @@ pub fn create_config(
494498
workers: usize,
495499
watch_proxy_rules_change: bool,
496500
tcp_nodelay: bool,
501+
tcp_timeout_ms: u64,
502+
udp_timeout_ms: u64,
497503
) -> Result<Config> {
498504
let server_addr = match parse_server_addr(&server_addr)? {
499505
Some(server_addr) => server_addr,
@@ -558,6 +564,8 @@ pub fn create_config(
558564
name_servers,
559565
watch_proxy_rules_change,
560566
tcp_nodelay,
567+
tcp_timeout_ms,
568+
udp_timeout_ms
561569
})
562570
}
563571

@@ -611,7 +619,7 @@ pub mod android {
611619
jkey: JString,
612620
jcipher: JString,
613621
jpassword: JString,
614-
jmaxIdleTimeoutMs: jint,
622+
jquicTimeoutMs: jint,
615623
jretryIntervalMs: jint,
616624
jworkers: jint,
617625
jtcpNoDelay: jboolean,
@@ -652,7 +660,7 @@ pub mod android {
652660
key,
653661
password,
654662
cipher,
655-
max_idle_timeout_ms: jmaxIdleTimeoutMs as u64,
663+
quic_timeout_ms: jquicTimeoutMs as u64,
656664
retry_interval_ms: jretryIntervalMs as u64,
657665
workers: jworkers as usize,
658666
};

src/quic/quic_client.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ impl QuicClient {
5757
config.password = quic_client_config.common_cfg.password.clone();
5858
config.cert_path = quic_client_config.common_cfg.cert.clone();
5959
config.cipher = quic_client_config.common_cfg.cipher.clone();
60-
config.max_idle_timeout_ms = quic_client_config.common_cfg.max_idle_timeout_ms;
60+
config.quic_timeout_ms = quic_client_config.common_cfg.quic_timeout_ms;
61+
config.tcp_timeout_ms = quic_client_config.common_cfg.tcp_timeout_ms;
62+
config.udp_timeout_ms = quic_client_config.common_cfg.udp_timeout_ms;
6163
config.wait_before_retry_ms = quic_client_config.common_cfg.retry_interval_ms;
6264
config.workers = quic_client_config.common_cfg.workers;
6365
config.local_tcp_server_addr = quic_client_config.local_tcp_server_addr;

src/quic/quic_server.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ impl QuicServer {
2020
password: quic_server_config.common_cfg.password.to_string(),
2121
cert_path: quic_server_config.common_cfg.cert.to_string(),
2222
key_path: quic_server_config.common_cfg.key.to_string(),
23-
max_idle_timeout_ms: quic_server_config.common_cfg.max_idle_timeout_ms,
23+
quic_timeout_ms: quic_server_config.common_cfg.quic_timeout_ms,
24+
tcp_timeout_ms: quic_server_config.common_cfg.tcp_timeout_ms,
25+
udp_timeout_ms: quic_server_config.common_cfg.udp_timeout_ms,
2426
default_tcp_upstream: quic_server_config.tcp_upstream,
2527
default_udp_upstream: quic_server_config.udp_upstream,
2628
dashboard_server: "".to_string(),

src/server.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,9 @@ impl Server {
332332
use_sync: bool,
333333
) -> Result<UdpServer> {
334334
let upstream_addr = inner_state!(self, udp_upstream).unwrap();
335-
let udp_server = UdpServer::bind_and_start(addr, upstream_addr, use_sync).await?;
335+
let udp_server =
336+
UdpServer::bind_and_start(addr, upstream_addr, use_sync, self.config.udp_timeout_ms)
337+
.await?;
336338

337339
let udp_server_addr = udp_server.local_addr().unwrap();
338340
inner_state!(self, udp_upstream) = Some(udp_server_addr);
@@ -491,15 +493,9 @@ impl Server {
491493
proxy_rule_manager: inner_state!(self, proxy_rule_manager).clone(),
492494
stats_sender,
493495
tcp_nodelay: cfg.tcp_nodelay,
496+
tcp_timeout_ms: cfg.tcp_timeout_ms,
494497
});
495498

496-
// if let Some(p) = cfg.server_addr.proto {
497-
// if p.is_udp_supported() {
498-
// let addr = cfg.upstream_udp.unwrap_or(cfg.upstream_addr);
499-
// let udp_server = UdpServer::bind_and_start(cfg.addr, upstream_addr, false).await;
500-
// }
501-
// }
502-
503499
loop {
504500
match proxy_listener.accept().await {
505501
Ok((inbound_stream, _addr)) => {
@@ -532,6 +528,7 @@ impl Server {
532528
inbound_stream,
533529
outbound_stream,
534530
&psp.stats_sender,
531+
psp.tcp_timeout_ms,
535532
)
536533
.await
537534
.ok();
@@ -789,6 +786,7 @@ impl Server {
789786
inbound_stream,
790787
outbound_stream,
791788
&params.stats_sender,
789+
params.tcp_timeout_ms,
792790
)
793791
.await?;
794792
}
@@ -863,6 +861,7 @@ impl Server {
863861
mut inbound_stream: TcpStream,
864862
mut outbound_stream: TcpStream,
865863
stats_sender: &Sender<ServerStats>,
864+
tcp_timeout_ms: u64,
866865
) -> Result<ProxyTraffic, ProxyError> {
867866
stats_sender.send(ServerStats::NewConnection).await.ok();
868867

@@ -897,13 +896,15 @@ impl Server {
897896
&mut outbound_writer,
898897
&mut inbound_buffer,
899898
&mut tx_bytes,
900-
&mut inbound_stream_eos) => result,
899+
&mut inbound_stream_eos,
900+
tcp_timeout_ms) => result,
901901
result = Self::transfer_data_with_timeout(
902902
&mut outbound_reader,
903903
&mut inbound_writer,
904904
&mut outbound_buffer,
905905
&mut rx_bytes,
906-
&mut outbound_stream_eos) => result,
906+
&mut outbound_stream_eos,
907+
tcp_timeout_ms) => result,
907908
}
908909
} else if !outbound_stream_eos {
909910
Self::transfer_data_with_timeout(
@@ -912,6 +913,7 @@ impl Server {
912913
&mut outbound_buffer,
913914
&mut rx_bytes,
914915
&mut outbound_stream_eos,
916+
tcp_timeout_ms,
915917
)
916918
.await
917919
} else {
@@ -921,6 +923,7 @@ impl Server {
921923
&mut inbound_buffer,
922924
&mut tx_bytes,
923925
&mut inbound_stream_eos,
926+
tcp_timeout_ms,
924927
)
925928
.await
926929
};
@@ -956,12 +959,13 @@ impl Server {
956959
buffer: &mut [u8],
957960
out_bytes: &mut u64,
958961
eos_flag: &mut bool,
962+
tcp_timeout_ms: u64,
959963
) -> Result<usize, ProxyError>
960964
where
961965
R: AsyncRead + Unpin,
962966
W: AsyncWrite + Unpin,
963967
{
964-
match tokio::time::timeout(Duration::from_secs(15), reader.read(buffer))
968+
match tokio::time::timeout(Duration::from_millis(tcp_timeout_ms), reader.read(buffer))
965969
.await
966970
.map_err(|_: Elapsed| ProxyError::Timeout)?
967971
{
@@ -1206,7 +1210,7 @@ impl Api for Server {
12061210
cert: cfg.cert.clone(),
12071211
cipher: cfg.cipher.clone(),
12081212
password: cfg.password.clone(),
1209-
idle_timeout: cfg.max_idle_timeout_ms,
1213+
idle_timeout: cfg.quic_timeout_ms,
12101214
retry_interval: cfg.retry_interval_ms,
12111215
}
12121216
}
@@ -1252,7 +1256,7 @@ impl Api for Server {
12521256
base_common_quic_config.cert = config.cert;
12531257
base_common_quic_config.cipher = config.cipher;
12541258
base_common_quic_config.password = config.password;
1255-
base_common_quic_config.max_idle_timeout_ms = config.idle_timeout;
1259+
base_common_quic_config.quic_timeout_ms = config.idle_timeout;
12561260
base_common_quic_config.retry_interval_ms = config.retry_interval;
12571261
let upstream_addr = NetAddr::from_str(config.upstream_addr.as_str())?;
12581262
let quic_client_join_handle = self
@@ -1273,4 +1277,5 @@ struct ProxySupportParams {
12731277
proxy_rule_manager: Option<ProxyRuleManager>,
12741278
stats_sender: Sender<ServerStats>,
12751279
tcp_nodelay: bool,
1280+
tcp_timeout_ms: u64,
12761281
}

src/udp/udp_server.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ impl UdpServer {
2424
server_addr: SocketAddr,
2525
upstream_addr: SocketAddr,
2626
use_sync: bool,
27+
udp_timeout_ms: u64,
2728
) -> Result<Self> {
2829
let serv_sock = Arc::new(UdpSocket::bind(server_addr).await?);
2930

@@ -41,7 +42,9 @@ impl UdpServer {
4142
Ok((size, addr)) => {
4243
tokio::spawn(async move {
4344
let state = state.lock().unwrap().clone();
44-
let sock = Self::open_udp_socket(&state, addr, upstream_addr).await?;
45+
let sock =
46+
Self::open_udp_socket(&state, addr, upstream_addr, udp_timeout_ms)
47+
.await?;
4548
sock.send(&buf[..size]).await.ok();
4649
Ok::<(), anyhow::Error>(())
4750
});
@@ -66,6 +69,7 @@ impl UdpServer {
6669
state: &State,
6770
inbound_addr: SocketAddr,
6871
outbound_addr: SocketAddr,
72+
udp_timeout_ms: u64,
6973
) -> Result<Arc<UdpSocket>> {
7074
if let Some(s) = state.sock_map.get(&inbound_addr) {
7175
return Ok((*s).clone());
@@ -87,7 +91,11 @@ impl UdpServer {
8791
);
8892
loop {
8993
let mut buf = BUFFER_POOL.alloc_and_fill(UDP_PACKET_SIZE);
90-
match tokio::time::timeout(Duration::from_secs(5), udp_socket.recv(&mut buf)).await
94+
match tokio::time::timeout(
95+
Duration::from_millis(udp_timeout_ms),
96+
udp_socket.recv(&mut buf),
97+
)
98+
.await
9199
{
92100
Ok(Ok(size)) => {
93101
unsafe {

0 commit comments

Comments
 (0)