Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Something's wrong with tokio runtime or tokio::net::UdpSocket, independent socket performace depends on each other, more sockets makes each FASTER #5795

Open
Originalimoc opened this issue Jun 14, 2023 · 19 comments
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-net Module: tokio/net

Comments

@Originalimoc
Copy link

Originalimoc commented Jun 14, 2023

Version
v1.28.1

Platform
Win11 WSL2 Ubuntu 22.04 16C32T/2CCD AMD

Description
Something's wrong with tokio task scheduler or tokio::net::UdpSocket, or WSL2 socket, or...?

Info: Independent socket, tokio-runtime CPU while testing is at 100%

A loop { socket.send().await }, either from a standalone current_runtime() or main [tokio::main] runtime, it is faster when more of them are running(1 only sending 1Gbps, but up to 5 of them then get 5x5Gbps then stay at around 5-7Gbps per socket)
[/short summary of the bug]

I tried this code:
[Later I'll upload both code to my GH page. Meanwhile any idea why is this possible?]

I expected to see this happen: Socket performance independent of each other, because I then tested std::net::UdpSocket and it is. 5-7Gbps each no matter connection count.

Instead, this happened: More is faster??? Some unexpected kernel socket behavior?

@Originalimoc Originalimoc added A-tokio Area: The main tokio crate C-bug Category: This is a bug. labels Jun 14, 2023
@Darksonn Darksonn added the M-net Module: tokio/net label Jun 14, 2023
@Noah-Kennedy
Copy link
Contributor

Can you post some code I can use to reproduce?

@Originalimoc
Copy link
Author

Originalimoc commented Jun 15, 2023

Here is test code "client", usage: "./this ::1:12345 &"(v4 127.0.0.1 also works)(& to run multiple instance(main issue manifests here), server is fully multithreaded without lock, follow later), this single thread socket receiver along can process 30Gbps+(2.5Mpps+) thanks to recvmmsg, maybe tokio should consider support it, it's another issue though.

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <arpa/inet.h>
#include <string.h> //memset
#include <time.h>

#define MAXMSGSIZE 32
#define DATASIZEPERDG 1500
long packageProcessed;
size_t recv_dg_len = 1450;

void *ReportProcessedPackages(){
    struct timespec oneSec;
    oneSec.tv_sec = 1;
    oneSec.tv_nsec = 0;
    long previousSec = 0;
    while(1){
        nanosleep(&oneSec,NULL);
        printf("Receive Previous Sec:%ld, Bandwidth:%ldMbps, Total:%ld\n",packageProcessed - previousSec,(packageProcessed - previousSec)*recv_dg_len*8/1024/1024,packageProcessed);
        previousSec = packageProcessed;
        fflush(stdout);
    }
}

int main(int argc, char *argv[]){
    srand(time(NULL));
    if (argc != 2) {
        printf("Usage: %s <IPv4 or IPv6 address:port>\n", argv[0]);
        exit(EXIT_FAILURE);
    }

    char *port_str = strrchr(argv[1], ':');
    if (port_str == NULL) {
        printf("Invalid format. Expected <address:port>\n");
        exit(EXIT_FAILURE);
    }

    // Null-terminate the address and advance the port_str to the port
    *port_str = '\0';
    port_str++;

    char *address = argv[1];
    int port = atoi(port_str);

    struct sockaddr_in6 server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin6_family = AF_INET6;
    server_addr.sin6_port = htons(port);

    if (inet_pton(AF_INET, address, &server_addr.sin6_addr.s6_addr[12]) == 1) {
        printf("Assuming IPv4 address\n");
        memset(&server_addr.sin6_addr.s6_addr[0], 0, 10);
        memset(&server_addr.sin6_addr.s6_addr[10], 0xff, 2);
    } else {
        printf("Assuming IPv6 address\n");
        if (inet_pton(AF_INET6, address, &server_addr.sin6_addr) != 1) {
            printf("Invalid address\n");
            exit(EXIT_FAILURE);
        }
    }

    int sock_fd = socket(AF_INET6, SOCK_DGRAM, 0);
    if (sock_fd < 0) {
        perror("socket creation failed");
        exit(EXIT_FAILURE);
    }

    struct sockaddr_in6 my_addr;
    memset(&my_addr, 0, sizeof(my_addr));
    my_addr.sin6_family = AF_INET6;
    my_addr.sin6_addr = in6addr_any;
    my_addr.sin6_port = htons(0);

    if (bind(sock_fd, (struct sockaddr *)&my_addr, sizeof(my_addr)) < 0) {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }

    char buffer[DATASIZEPERDG];
    for (int i = 0; i < DATASIZEPERDG; i++) {
        buffer[i] = rand();
    }
    
    for (int i = 0; i < 1; i++) {
        if (sendto(sock_fd, buffer, 888, 0, (const struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
            perror("sendto failed");
            exit(EXIT_FAILURE);
        }
        printf("UDP message sent to %s:%d\n", address, port);
    }

    pthread_t reportT;
    pthread_create(&reportT,NULL,ReportProcessedPackages,NULL);

    struct mmsghdr Msgs[MAXMSGSIZE];
    memset(Msgs,0,MAXMSGSIZE * sizeof(struct mmsghdr));
    struct iovec iovecs[MAXMSGSIZE];
    for(unsigned int i=0; i<MAXMSGSIZE; i++){
        iovecs[i].iov_base          =malloc(DATASIZEPERDG);
        iovecs[i].iov_len          =DATASIZEPERDG;
    }
    for(unsigned int i=0;i<MAXMSGSIZE;i++){
        Msgs[i].msg_hdr.msg_name   =&server_addr;
        Msgs[i].msg_hdr.msg_namelen=sizeof(struct sockaddr_in6);
    //  Msgs[i].msg_hdr.msg_namelen=0;
        Msgs[i].msg_hdr.msg_iovlen =1;
        Msgs[i].msg_hdr.msg_iov    =&(iovecs[i]);
    }
    while(1){
        int r = recvmmsg(sock_fd, Msgs, MAXMSGSIZE, MSG_WAITFORONE, NULL);
        if (r > 0) {
            recv_dg_len = Msgs[0].msg_len;
        }
        packageProcessed += r;
    }
    pthread_join(reportT,NULL);
    return 0;
}

@Originalimoc
Copy link
Author

Originalimoc commented Jun 15, 2023

Server side code, dep tokio/clap/rand/socket2, run with "cargo run --release -- --listen-port 57777 --payload-size 1450 --threads 1":

use clap::Parser;
use tokio::net::UdpSocket;
use tokio::time::Instant;
use std::net::{IpAddr, SocketAddr, SocketAddrV6};
use rand::{SeedableRng, RngCore};
use rand::rngs::StdRng;
use socket2::{Domain, Protocol, Socket, Type};

// 20 seconds of 1500 bytes for 3 Gbps
const COUNT_PER_CLIENT: usize = 5000000;

#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
pub struct Config {
    #[arg(short, long)]
    pub listen_port: u16,
    #[arg(short, long)]
    pub payload_size: u16,
    #[arg(short, long)]
    pub threads: usize
}

#[tokio::main]
async fn main() {
    let config = Config::parse();
    let server_socket = UdpSocket::from_std(create_udp_socket_bind_reuse_addr(format!("[::]:{}", config.listen_port).parse::<SocketAddrV6>().expect("never fail"), false).expect("Binding local listening port")).unwrap();

    loop {
        let mut buf = Vec::with_capacity(1500);
        match server_socket.recv_buf_from(&mut buf).await {
            Ok((psize, client_addr)) => {
                for _ in 0..config.threads {
                    println!("Get {} bytes from {}:{}", psize, trim_v6_map(client_addr.ip()), client_addr.port());
                    let server_socket = UdpSocket::from_std(create_udp_socket_bind_reuse_addr(format!("[::]:{}", config.listen_port).parse::<SocketAddrV6>().expect("never fail"), false).expect("Binding local listening port")).unwrap();
                    tokio::task::spawn(flood(server_socket, client_addr, config.payload_size, COUNT_PER_CLIENT));
                }
            },
            Err(e) => {
                eprintln!("main listening port recv err: {}", e);
                continue;
            },
        }
    }
}

async fn flood(socket: UdpSocket, target: SocketAddr, payload_size: u16, count: usize) {
    if let Err(e) = socket.connect(target).await {
        eprintln!("connecting to {} failed: {}", target, e);
        return;
    } else {
        println!("start sending {} {} bytes packets to {}", count, payload_size, target);
    }
    let mut rng = StdRng::from_rng(rand::thread_rng()).unwrap();
    let mut data = vec![0; payload_size as usize];
    let start = Instant::now();
    for _ in 0..count {
        rng.fill_bytes(&mut data);
        let _ = socket.send(&data).await;
    }
    let bandwidth = count * payload_size as usize / 1000 / 1000 * 8; // to Mb
    println!("{} Mb sent to {}:{} in {:?}, {} Mbps", bandwidth, trim_v6_map(target.ip()), target.port(), start.elapsed(), bandwidth as f64 / start.elapsed().as_secs_f64());
}

fn trim_v6_map(potentially_mapped_addr: IpAddr) -> IpAddr {
	match potentially_mapped_addr {
		IpAddr::V4(_) => {
			potentially_mapped_addr
		},
		IpAddr::V6(addr) => {
			if let Some(v4addr) = addr.to_ipv4_mapped() {
				IpAddr::V4(v4addr)
			} else {
				potentially_mapped_addr
			}
		},
	}
}

enum IsIPv4{
    True(std::net::Ipv4Addr),
    False
}

fn is_ipv4_or_ipv4_mapped_in_ipv6(ip: IpAddr) -> IsIPv4 {
    match ip {
        IpAddr::V4(i) => {
            IsIPv4::True(i)
        },
        IpAddr::V6(i) => {
            if let Some(ip) = i.to_ipv4_mapped() {
                IsIPv4::True(ip)
            } else {
                IsIPv4::False
            }
        },
    }
}

fn create_udp_socket_bind_reuse_addr<A: std::net::ToSocketAddrs>(addr: A, nonblocking: bool) -> Result<std::net::UdpSocket, std::io::Error> {
    let addrs = addr.to_socket_addrs()?;
    let mut last_err = None;
    for addr in addrs {
        let domain = match is_ipv4_or_ipv4_mapped_in_ipv6(addr.ip()) {
            IsIPv4::True(_) => Domain::IPV4,
            IsIPv4::False => Domain::IPV6
        };
        let socket = match Socket::new(domain, Type::DGRAM, Some(Protocol::UDP)) {
            Ok(socket) => socket,
            Err(e) => {
                last_err = Some(e);
                continue;
            },
        };
        match socket.set_nonblocking(nonblocking) {
            Ok(_) => {},
            Err(e) => {
                last_err = Some(e);
                continue;
            },
        };
        #[cfg(target_family = "unix")]
        match socket.set_reuse_port(true) {
            Ok(_) => {
                // println!("set_reuse_port success")
            },
            Err(e) => {
                last_err = Some(e);
                continue;
            },
        }
        #[cfg(not(target_family = "unix"))]
        match socket.set_reuse_address(true) {
            Ok(_) => {
                // println!("set_reuse_address success")
            },
            Err(e) => {
                last_err = Some(e);
                continue;
            },
        };
        match socket.bind(&socket2::SockAddr::from(addr.to_owned())) {
            Ok(_) => return Ok(socket.into()),
            Err(e) => last_err = Some(e),
        }
    }
    Err(last_err.unwrap_or_else(|| {
        std::io::Error::new(
            std::io::ErrorKind::InvalidInput,
            "could not resolve to any address",
        )
    }))
}

Replace with std::net::UdpSocket and std::thread::spawn(move || flood()) you'll get expected behavior.

@Originalimoc
Copy link
Author

Originalimoc commented Jun 15, 2023

LOL 🧵
Tested on Nightly/Beta rustup, std performance peak avg around 7.5Gbps+, tokio achieved 6.9Gbps(see below) best, acceptable enough.

@Originalimoc
Copy link
Author

Originalimoc commented Jun 15, 2023

Also if you run only one client then immediately close it, you'll get server side send performance reporting not 1Gbps but 3.5-4Gbps(actual speed too even it's discarded by kernel recv side), tokio internally is waiting something/busy waiting some error/confirmation, is it related to inner socket configuration, or how it's polled? Maybe test on two VM instead of one.

@Originalimoc
Copy link
Author

Originalimoc commented Jun 15, 2023

Update: after taskset server to a ONE core, performance gets to consistent 4.3Gbps, even weirder it'll seemingly randomly get to STABLE 6.4-6.9Gbps latter half of the 5M test packets:

This is when server set to ONE core:

2023-06-15 14_16_52

Besides that random speed bump. Also notice how two receiver/client will starve one which is also might be an issue to consider for, the scheduler basically made async flood() a blocking task.

Set to two cores make one under 1Gbps again.

@Noah-Kennedy
Copy link
Contributor

FYI I've updated your comments to have syntax highlighting (in the opening line, you use ```rust or ```c). I'll take a look soon at this.

@Originalimoc
Copy link
Author

Any update? This feels like a quite deep and high prio issue of the task scheduler or UdpSocket polling mechanism(?) in tokio.

@Darksonn
Copy link
Contributor

Darksonn commented Aug 3, 2023

I don't have any update. I have not looked into it.

@Noah-Kennedy
Copy link
Contributor

I'll take a look at this today and see if this reproduces on my Linux system. If it does, I'll dig in more.

@Noah-Kennedy
Copy link
Contributor

@Originalimoc I had to modify both the rust and the C++ code a bit to get things to compile (gcc 13.2.1, latest rust).

After a few tweaks to your code so that the server would keep sending to make testing a bit easier, I do not observe on my end the performance of a client-server socket pair increasing as more clients are added.

In fact, I observe the opposite as expected. Adding more clients gradually diminishes throughput. This checks out, as the performance of the program appears to be primarily bounded by the performance of the write syscall and the rng, so as more clients are added, the CPU resources of the program are gradually occupied more and more, and per-client throughput decreases.

@Originalimoc
Copy link
Author

Later, I'll do a retest again both on WSL and Debian 12/6.1.

@Originalimoc
Copy link
Author

Originalimoc commented Aug 14, 2023

Wondering what you've needed to change to compile, "Debian 12.2 gcc -o client client.c -O2 -lpthread" compiled, and Rust 1.72b cargo build with this Cargo.toml(no Cargo.lock so latest) also directly compiled:

[package]
name = "server"
version = "0.1.0"
edition = "2021"
[dependencies]
clap = { version = "4", features = ["derive"] }
tokio = { version = "1", features = ["full"] }
socket2 = "0.5"
rand = "0.8"

Compiling aside, I just tested one session of two seperate VM(one client one server, on same host) with Debian 12, results are concerning slow(~900Mbps, vNIC limit?), Then I put server(flood()) on a Windows 11 VM(a guest of the same host), now 200Mbps(...OK(confuse face)?)

Then I put server and client on same Debian VM(Both are 8 core, pinned VM core, not shared, same above, this factor is considered) with this sysctl:

net.core.rmem_default = 1525000
net.core.wmem_default = 1525000
net.core.rmem_max = 50833333
net.core.wmem_max = 50833333

One client get ~1600Mbps, then firing up 2 more, each of them goes up to 1700-1800Mbps, so confirming this issue is not fixed on Rust 1.72 x Tokio 1.31. Also taskset -c 0 cargo run --release -- --listen-port 12345 --payload-size 1450 --threads 1 will get 2200Mbps and taskset more core >=2 will degrade performance to 1600 again. (Note, previous test both client and server are running on same WSL2 instance, also same host)

I'm also rewriting server side in native pthread C, later

@Originalimoc
Copy link
Author

Originalimoc commented Aug 14, 2023

Oh I guess you treat it as C++ code gcc also cpp suffix, hmmm no It's C code(looks like function pointer type doesn't constraining the same way)😅. What about Rust side then, derive of clap not enabled?

@Noah-Kennedy
Copy link
Contributor

Oh I realized now what was going on. I forgot that recv_buf_from was an actual function under the io-utils feature, which I didn't enable lol. Fortunately with my slight tweaks, perf wouldn't change.

@Originalimoc
Copy link
Author

spoiler: WSL2 socket is blazing fast

@Originalimoc
Copy link
Author

Originalimoc commented Aug 14, 2023

Debian VM loopback 2600Mbps vs WSL2 11000Mbps...(When that C macro THREADS set to 1, when THREADS set to 4 client side single recvmmsg() get saturated at 37Gbps(WSL2)), (I also have no idea how it goes from 7Gbps to 11Gbps and 30Gbps to 37Gbps, guess no RNG? But that's only flood() side, client recvmmsg also get faster.)
Achived by using this C server code:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <fcntl.h>
#include <time.h>

#define COUNT_PER_CLIENT 2000000
#define PAYLOAD_SIZE 1450
#define THREADS 1
#define LISTEN_PORT 12345

struct client_data {
    struct sockaddr_in client_addr;
};

void *flood(void *arg) {
    struct client_data *data = (struct client_data *)arg;
    int sock = socket(PF_INET, SOCK_DGRAM, 0);
    char payload[PAYLOAD_SIZE];
    struct timespec start_time, end_time;

    clock_gettime(CLOCK_REALTIME, &start_time);
    for (int i = 0; i < COUNT_PER_CLIENT; i++) {
        sendto(sock, payload, PAYLOAD_SIZE, 0, (struct sockaddr *)&data->client_addr, sizeof(data->client_addr));
    }
    clock_gettime(CLOCK_REALTIME, &end_time);
    double elapsed_seconds = end_time.tv_sec - start_time.tv_sec;
    long elapsed_nanoseconds = end_time.tv_nsec - start_time.tv_nsec;
    if (elapsed_nanoseconds < 0) {
        elapsed_seconds -= 1;
        elapsed_nanoseconds += 1000000000;
    }
    double time_spend = elapsed_seconds + elapsed_nanoseconds * 1e-9;

    double bandwidth = (double) COUNT_PER_CLIENT * PAYLOAD_SIZE / time_spend / 1000000 * 8; // to Mbps

    printf("Sent %ld MB to %s:%d in %f seconds, %f Mbps\n", (long)COUNT_PER_CLIENT * PAYLOAD_SIZE / 1000000, inet_ntoa(data->client_addr.sin_addr), ntohs(data->client_addr.sin_port), time_spend, bandwidth);

    close(sock);
    free(data);
    return NULL;
}

int main() {
    int server_socket;
    struct sockaddr_in server_addr, client_addr;
    socklen_t addr_size = sizeof(struct sockaddr_in);

    server_socket = socket(PF_INET, SOCK_DGRAM, 0);
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(LISTEN_PORT);
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    memset(server_addr.sin_zero, '\0', sizeof server_addr.sin_zero);
    bind(server_socket, (struct sockaddr *) &server_addr, sizeof(server_addr));

    while(1) {
        char buffer[PAYLOAD_SIZE];
        int recv_len = recvfrom(server_socket, buffer, PAYLOAD_SIZE, 0, (struct sockaddr*)&client_addr, &addr_size);
        printf("Received packet from %s:%d\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));

        pthread_t threads[THREADS];

        for(int i = 0; i < THREADS; i++) {
            struct client_data *data = malloc(sizeof(struct client_data));
            data->client_addr = client_addr;
            pthread_create(&threads[i], NULL, flood, (void *)data);
            printf("Launched thread %d\n", i);
        }
    }

    return 0;
}

And for testing again plus comparing to C version, I removed RNG like this

    let mut rng = StdRng::from_rng(rand::thread_rng()).unwrap();
    let mut data = vec![0; payload_size as usize];
    rng.fill_bytes(&mut data);
    let start = Instant::now();
    for _ in 0..count {
        let _ = socket.send(&data).await;
    }

Then I get this(on WSL2):

taskset -c 1 cargo run --release -- --listen-port 12345 --payload-size 1450 --threads 1
    Finished release [optimized] target(s) in 0.67s
     Running `target/release/fixtokio --listen-port 12345 --payload-size 1450 --threads 1`
Get 888 bytes from 127.0.0.1:58772
start sending 5000000 1450 bytes packets to 127.0.0.1:58772
Get 888 bytes from 127.0.0.1:52640
start sending 5000000 1450 bytes packets to 127.0.0.1:52640
Get 888 bytes from 127.0.0.1:42390
58000 Mb sent to 127.0.0.1:58772 in 10.011149583s, 5793.540339827998 Mbps
start sending 5000000 1450 bytes packets to 127.0.0.1:42390
58000 Mb sent to 127.0.0.1:52640 in 10.354338794s, 5601.516462779526 Mbps
58000 Mb sent to 127.0.0.1:42390 in 11.820154366s, 4906.873291053629 Mbps

...............

taskset -c 0,1,2,3 cargo run --release -- --listen-port
 12345 --payload-size 1450 --threads 1
    Finished release [optimized] target(s) in 0.61s
     Running `target/release/fixtokio --listen-port 12345 --payload-size 1450 --threads 1`
Get 888 bytes from 127.0.0.1:43979
start sending 5000000 1450 bytes packets to 127.0.0.1:43979
58000 Mb sent to 127.0.0.1:43979 in 32.663608535s, 1775.6764324907329 Mbps
[Explain: 2 clients start same time here]
Get 888 bytes from 127.0.0.1:46425
start sending 5000000 1450 bytes packets to 127.0.0.1:46425
Get 888 bytes from 127.0.0.1:47081
start sending 5000000 1450 bytes packets to 127.0.0.1:47081
58000 Mb sent to 127.0.0.1:46425 in 6.56691725s, 8832.150087394353 Mbps
58000 Mb sent to 127.0.0.1:47081 in 9.388535477s, 6177.747286533227 Mbps

2 simutanious client side partial log when taskset -c 0,1,2,3 server is running:

Receive Previous Sec:914194, Bandwidth:10113Mbps, Total:2657156
Receive Previous Sec:911186, Bandwidth:10080Mbps, Total:3568367

So it's almost as fast as C version when this bug does not manifest.
Note 5000~6000Mbps when taskset one core and multicored 1775.67Mbps and 8800+6177Mbps.

My only guess is your testing machine is not multicore?

I don't have more info now. Already as detailed. Also is that nonblock from std::UdpSocket to tokio::UdpSocket actually necessary? This program nature is blocking, how nonblock helps here, how tokio poll it? Is overhead cost considered? I used Linux epoll extensively before and had not set that flag.

@Originalimoc
Copy link
Author

Hoping for a follow-up.

@Originalimoc
Copy link
Author

Still there

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-net Module: tokio/net
Projects
None yet
Development

No branches or pull requests

3 participants