-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Something's wrong with tokio runtime or tokio::net::UdpSocket, independent socket performace depends on each other, more sockets makes each FASTER #5795
Comments
Can you post some code I can use to reproduce? |
Here is test code "client", usage: "./this ::1:12345 &"(v4 127.0.0.1 also works)(& to run multiple instance(main issue manifests here), server is fully multithreaded without lock, follow later), this single thread socket receiver along can process 30Gbps+(2.5Mpps+) thanks to recvmmsg, maybe tokio should consider support it, it's another issue though. #define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <arpa/inet.h>
#include <string.h> //memset
#include <time.h>
#define MAXMSGSIZE 32
#define DATASIZEPERDG 1500
long packageProcessed;
size_t recv_dg_len = 1450;
void *ReportProcessedPackages(){
struct timespec oneSec;
oneSec.tv_sec = 1;
oneSec.tv_nsec = 0;
long previousSec = 0;
while(1){
nanosleep(&oneSec,NULL);
printf("Receive Previous Sec:%ld, Bandwidth:%ldMbps, Total:%ld\n",packageProcessed - previousSec,(packageProcessed - previousSec)*recv_dg_len*8/1024/1024,packageProcessed);
previousSec = packageProcessed;
fflush(stdout);
}
}
int main(int argc, char *argv[]){
srand(time(NULL));
if (argc != 2) {
printf("Usage: %s <IPv4 or IPv6 address:port>\n", argv[0]);
exit(EXIT_FAILURE);
}
char *port_str = strrchr(argv[1], ':');
if (port_str == NULL) {
printf("Invalid format. Expected <address:port>\n");
exit(EXIT_FAILURE);
}
// Null-terminate the address and advance the port_str to the port
*port_str = '\0';
port_str++;
char *address = argv[1];
int port = atoi(port_str);
struct sockaddr_in6 server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin6_family = AF_INET6;
server_addr.sin6_port = htons(port);
if (inet_pton(AF_INET, address, &server_addr.sin6_addr.s6_addr[12]) == 1) {
printf("Assuming IPv4 address\n");
memset(&server_addr.sin6_addr.s6_addr[0], 0, 10);
memset(&server_addr.sin6_addr.s6_addr[10], 0xff, 2);
} else {
printf("Assuming IPv6 address\n");
if (inet_pton(AF_INET6, address, &server_addr.sin6_addr) != 1) {
printf("Invalid address\n");
exit(EXIT_FAILURE);
}
}
int sock_fd = socket(AF_INET6, SOCK_DGRAM, 0);
if (sock_fd < 0) {
perror("socket creation failed");
exit(EXIT_FAILURE);
}
struct sockaddr_in6 my_addr;
memset(&my_addr, 0, sizeof(my_addr));
my_addr.sin6_family = AF_INET6;
my_addr.sin6_addr = in6addr_any;
my_addr.sin6_port = htons(0);
if (bind(sock_fd, (struct sockaddr *)&my_addr, sizeof(my_addr)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
char buffer[DATASIZEPERDG];
for (int i = 0; i < DATASIZEPERDG; i++) {
buffer[i] = rand();
}
for (int i = 0; i < 1; i++) {
if (sendto(sock_fd, buffer, 888, 0, (const struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("sendto failed");
exit(EXIT_FAILURE);
}
printf("UDP message sent to %s:%d\n", address, port);
}
pthread_t reportT;
pthread_create(&reportT,NULL,ReportProcessedPackages,NULL);
struct mmsghdr Msgs[MAXMSGSIZE];
memset(Msgs,0,MAXMSGSIZE * sizeof(struct mmsghdr));
struct iovec iovecs[MAXMSGSIZE];
for(unsigned int i=0; i<MAXMSGSIZE; i++){
iovecs[i].iov_base =malloc(DATASIZEPERDG);
iovecs[i].iov_len =DATASIZEPERDG;
}
for(unsigned int i=0;i<MAXMSGSIZE;i++){
Msgs[i].msg_hdr.msg_name =&server_addr;
Msgs[i].msg_hdr.msg_namelen=sizeof(struct sockaddr_in6);
// Msgs[i].msg_hdr.msg_namelen=0;
Msgs[i].msg_hdr.msg_iovlen =1;
Msgs[i].msg_hdr.msg_iov =&(iovecs[i]);
}
while(1){
int r = recvmmsg(sock_fd, Msgs, MAXMSGSIZE, MSG_WAITFORONE, NULL);
if (r > 0) {
recv_dg_len = Msgs[0].msg_len;
}
packageProcessed += r;
}
pthread_join(reportT,NULL);
return 0;
} |
Server side code, dep tokio/clap/rand/socket2, run with "cargo run --release -- --listen-port 57777 --payload-size 1450 --threads 1": use clap::Parser;
use tokio::net::UdpSocket;
use tokio::time::Instant;
use std::net::{IpAddr, SocketAddr, SocketAddrV6};
use rand::{SeedableRng, RngCore};
use rand::rngs::StdRng;
use socket2::{Domain, Protocol, Socket, Type};
// 20 seconds of 1500 bytes for 3 Gbps
const COUNT_PER_CLIENT: usize = 5000000;
#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
pub struct Config {
#[arg(short, long)]
pub listen_port: u16,
#[arg(short, long)]
pub payload_size: u16,
#[arg(short, long)]
pub threads: usize
}
#[tokio::main]
async fn main() {
let config = Config::parse();
let server_socket = UdpSocket::from_std(create_udp_socket_bind_reuse_addr(format!("[::]:{}", config.listen_port).parse::<SocketAddrV6>().expect("never fail"), false).expect("Binding local listening port")).unwrap();
loop {
let mut buf = Vec::with_capacity(1500);
match server_socket.recv_buf_from(&mut buf).await {
Ok((psize, client_addr)) => {
for _ in 0..config.threads {
println!("Get {} bytes from {}:{}", psize, trim_v6_map(client_addr.ip()), client_addr.port());
let server_socket = UdpSocket::from_std(create_udp_socket_bind_reuse_addr(format!("[::]:{}", config.listen_port).parse::<SocketAddrV6>().expect("never fail"), false).expect("Binding local listening port")).unwrap();
tokio::task::spawn(flood(server_socket, client_addr, config.payload_size, COUNT_PER_CLIENT));
}
},
Err(e) => {
eprintln!("main listening port recv err: {}", e);
continue;
},
}
}
}
async fn flood(socket: UdpSocket, target: SocketAddr, payload_size: u16, count: usize) {
if let Err(e) = socket.connect(target).await {
eprintln!("connecting to {} failed: {}", target, e);
return;
} else {
println!("start sending {} {} bytes packets to {}", count, payload_size, target);
}
let mut rng = StdRng::from_rng(rand::thread_rng()).unwrap();
let mut data = vec![0; payload_size as usize];
let start = Instant::now();
for _ in 0..count {
rng.fill_bytes(&mut data);
let _ = socket.send(&data).await;
}
let bandwidth = count * payload_size as usize / 1000 / 1000 * 8; // to Mb
println!("{} Mb sent to {}:{} in {:?}, {} Mbps", bandwidth, trim_v6_map(target.ip()), target.port(), start.elapsed(), bandwidth as f64 / start.elapsed().as_secs_f64());
}
fn trim_v6_map(potentially_mapped_addr: IpAddr) -> IpAddr {
match potentially_mapped_addr {
IpAddr::V4(_) => {
potentially_mapped_addr
},
IpAddr::V6(addr) => {
if let Some(v4addr) = addr.to_ipv4_mapped() {
IpAddr::V4(v4addr)
} else {
potentially_mapped_addr
}
},
}
}
enum IsIPv4{
True(std::net::Ipv4Addr),
False
}
fn is_ipv4_or_ipv4_mapped_in_ipv6(ip: IpAddr) -> IsIPv4 {
match ip {
IpAddr::V4(i) => {
IsIPv4::True(i)
},
IpAddr::V6(i) => {
if let Some(ip) = i.to_ipv4_mapped() {
IsIPv4::True(ip)
} else {
IsIPv4::False
}
},
}
}
fn create_udp_socket_bind_reuse_addr<A: std::net::ToSocketAddrs>(addr: A, nonblocking: bool) -> Result<std::net::UdpSocket, std::io::Error> {
let addrs = addr.to_socket_addrs()?;
let mut last_err = None;
for addr in addrs {
let domain = match is_ipv4_or_ipv4_mapped_in_ipv6(addr.ip()) {
IsIPv4::True(_) => Domain::IPV4,
IsIPv4::False => Domain::IPV6
};
let socket = match Socket::new(domain, Type::DGRAM, Some(Protocol::UDP)) {
Ok(socket) => socket,
Err(e) => {
last_err = Some(e);
continue;
},
};
match socket.set_nonblocking(nonblocking) {
Ok(_) => {},
Err(e) => {
last_err = Some(e);
continue;
},
};
#[cfg(target_family = "unix")]
match socket.set_reuse_port(true) {
Ok(_) => {
// println!("set_reuse_port success")
},
Err(e) => {
last_err = Some(e);
continue;
},
}
#[cfg(not(target_family = "unix"))]
match socket.set_reuse_address(true) {
Ok(_) => {
// println!("set_reuse_address success")
},
Err(e) => {
last_err = Some(e);
continue;
},
};
match socket.bind(&socket2::SockAddr::from(addr.to_owned())) {
Ok(_) => return Ok(socket.into()),
Err(e) => last_err = Some(e),
}
}
Err(last_err.unwrap_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"could not resolve to any address",
)
}))
} Replace with std::net::UdpSocket and std::thread::spawn(move || flood()) you'll get expected behavior. |
LOL 🧵 |
Also if you run only one client then immediately close it, you'll get server side send performance reporting not 1Gbps but 3.5-4Gbps(actual speed too even it's discarded by kernel recv side), tokio internally is waiting something/busy waiting some error/confirmation, is it related to inner socket configuration, or how it's polled? Maybe test on two VM instead of one. |
Update: after taskset server to a ONE core, performance gets to consistent 4.3Gbps, even weirder it'll seemingly randomly get to STABLE 6.4-6.9Gbps latter half of the 5M test packets: This is when server set to ONE core: Besides that random speed bump. Also notice how two receiver/client will starve one which is also might be an issue to consider for, the scheduler basically made async flood() a blocking task. Set to two cores make one under 1Gbps again. |
FYI I've updated your comments to have syntax highlighting (in the opening line, you use ```rust or ```c). I'll take a look soon at this. |
Any update? This feels like a quite deep and high prio issue of the task scheduler or UdpSocket polling mechanism(?) in tokio. |
I don't have any update. I have not looked into it. |
I'll take a look at this today and see if this reproduces on my Linux system. If it does, I'll dig in more. |
@Originalimoc I had to modify both the rust and the C++ code a bit to get things to compile (gcc 13.2.1, latest rust). After a few tweaks to your code so that the server would keep sending to make testing a bit easier, I do not observe on my end the performance of a client-server socket pair increasing as more clients are added. In fact, I observe the opposite as expected. Adding more clients gradually diminishes throughput. This checks out, as the performance of the program appears to be primarily bounded by the performance of the write syscall and the rng, so as more clients are added, the CPU resources of the program are gradually occupied more and more, and per-client throughput decreases. |
Later, I'll do a retest again both on WSL and Debian 12/6.1. |
Wondering what you've needed to change to compile, "Debian 12.2 gcc -o client client.c -O2 -lpthread" compiled, and Rust 1.72b cargo build with this Cargo.toml(no Cargo.lock so latest) also directly compiled: [package]
name = "server"
version = "0.1.0"
edition = "2021"
[dependencies]
clap = { version = "4", features = ["derive"] }
tokio = { version = "1", features = ["full"] }
socket2 = "0.5"
rand = "0.8" Compiling aside, I just tested one session of two seperate VM(one client one server, on same host) with Debian 12, results are concerning slow(~900Mbps, vNIC limit?), Then I put server(flood()) on a Windows 11 VM(a guest of the same host), now 200Mbps(...OK(confuse face)?) Then I put server and client on same Debian VM(Both are 8 core, pinned VM core, not shared, same above, this factor is considered) with this sysctl:
One client get ~1600Mbps, then firing up 2 more, each of them goes up to 1700-1800Mbps, so confirming this issue is not fixed on Rust 1.72 x Tokio 1.31. Also I'm also rewriting server side in native pthread C, later |
Oh I guess you treat it as C++ code gcc also cpp suffix, hmmm no It's C code(looks like function pointer type doesn't constraining the same way)😅. What about Rust side then, derive of clap not enabled? |
Oh I realized now what was going on. I forgot that |
spoiler: WSL2 socket is blazing fast |
Debian VM loopback 2600Mbps vs WSL2 11000Mbps...(When that C macro THREADS set to 1, when THREADS set to 4 client side single recvmmsg() get saturated at 37Gbps(WSL2)), (I also have no idea how it goes from 7Gbps to 11Gbps and 30Gbps to 37Gbps, guess no RNG? But that's only flood() side, client recvmmsg also get faster.) #include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <fcntl.h>
#include <time.h>
#define COUNT_PER_CLIENT 2000000
#define PAYLOAD_SIZE 1450
#define THREADS 1
#define LISTEN_PORT 12345
struct client_data {
struct sockaddr_in client_addr;
};
void *flood(void *arg) {
struct client_data *data = (struct client_data *)arg;
int sock = socket(PF_INET, SOCK_DGRAM, 0);
char payload[PAYLOAD_SIZE];
struct timespec start_time, end_time;
clock_gettime(CLOCK_REALTIME, &start_time);
for (int i = 0; i < COUNT_PER_CLIENT; i++) {
sendto(sock, payload, PAYLOAD_SIZE, 0, (struct sockaddr *)&data->client_addr, sizeof(data->client_addr));
}
clock_gettime(CLOCK_REALTIME, &end_time);
double elapsed_seconds = end_time.tv_sec - start_time.tv_sec;
long elapsed_nanoseconds = end_time.tv_nsec - start_time.tv_nsec;
if (elapsed_nanoseconds < 0) {
elapsed_seconds -= 1;
elapsed_nanoseconds += 1000000000;
}
double time_spend = elapsed_seconds + elapsed_nanoseconds * 1e-9;
double bandwidth = (double) COUNT_PER_CLIENT * PAYLOAD_SIZE / time_spend / 1000000 * 8; // to Mbps
printf("Sent %ld MB to %s:%d in %f seconds, %f Mbps\n", (long)COUNT_PER_CLIENT * PAYLOAD_SIZE / 1000000, inet_ntoa(data->client_addr.sin_addr), ntohs(data->client_addr.sin_port), time_spend, bandwidth);
close(sock);
free(data);
return NULL;
}
int main() {
int server_socket;
struct sockaddr_in server_addr, client_addr;
socklen_t addr_size = sizeof(struct sockaddr_in);
server_socket = socket(PF_INET, SOCK_DGRAM, 0);
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(LISTEN_PORT);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
memset(server_addr.sin_zero, '\0', sizeof server_addr.sin_zero);
bind(server_socket, (struct sockaddr *) &server_addr, sizeof(server_addr));
while(1) {
char buffer[PAYLOAD_SIZE];
int recv_len = recvfrom(server_socket, buffer, PAYLOAD_SIZE, 0, (struct sockaddr*)&client_addr, &addr_size);
printf("Received packet from %s:%d\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
pthread_t threads[THREADS];
for(int i = 0; i < THREADS; i++) {
struct client_data *data = malloc(sizeof(struct client_data));
data->client_addr = client_addr;
pthread_create(&threads[i], NULL, flood, (void *)data);
printf("Launched thread %d\n", i);
}
}
return 0;
} And for testing again plus comparing to C version, I removed RNG like this let mut rng = StdRng::from_rng(rand::thread_rng()).unwrap();
let mut data = vec![0; payload_size as usize];
rng.fill_bytes(&mut data);
let start = Instant::now();
for _ in 0..count {
let _ = socket.send(&data).await;
} Then I get this(on WSL2):
2 simutanious client side partial log when taskset -c 0,1,2,3 server is running:
So it's almost as fast as C version when this bug does not manifest. My only guess is your testing machine is not multicore? I don't have more info now. Already as detailed. Also is that nonblock from std::UdpSocket to tokio::UdpSocket actually necessary? This program nature is blocking, how nonblock helps here, how tokio poll it? Is overhead cost considered? I used Linux epoll extensively before and had not set that flag. |
Hoping for a follow-up. |
Still there |
Version
v1.28.1
Platform
Win11 WSL2 Ubuntu 22.04 16C32T/2CCD AMD
Description
Something's wrong with tokio task scheduler or tokio::net::UdpSocket, or WSL2 socket, or...?
Info: Independent socket, tokio-runtime CPU while testing is at 100%
A loop { socket.send().await }, either from a standalone current_runtime() or main [tokio::main] runtime, it is faster when more of them are running(1 only sending 1Gbps, but up to 5 of them then get 5x5Gbps then stay at around 5-7Gbps per socket)
[/short summary of the bug]
I tried this code:
[Later I'll upload both code to my GH page. Meanwhile any idea why is this possible?]
I expected to see this happen: Socket performance independent of each other, because I then tested std::net::UdpSocket and it is. 5-7Gbps each no matter connection count.
Instead, this happened: More is faster??? Some unexpected kernel socket behavior?
The text was updated successfully, but these errors were encountered: