Description
There might be a performance bug in FuturesUnordered
.
Here's an example, which puts n
async futures in FuturesUnordered
and waits for all of them to finish.
Each task locks an async mutex and immediately frees it, which causes them to finish one by one.
It looks like the execution time is quadratic, each time n
increases 2x, the time increases 4x.
n: 10000, time: 23ms
n: 20000, time: 89ms
n: 40000, time: 338ms
n: 80000, time: 1304ms
n: 160000, time: 5181ms
Documentation states:
Futures managed by FuturesUnordered will only be polled when they generate wake-up notifications.
and the implementation tries hard to avoid quadratic complexity, so it looks like a bug.
We ran into this issue when trying to perform many concurrent database queries In scylladb/scylla-rust-driver#362
main.rs
:
use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
use std::sync::Arc;
use tokio::sync::Mutex;
async fn do_task(mutex: Arc<Mutex<()>>) {
mutex.lock().await;
}
async fn benchmark(n: usize) {
let start_time: std::time::Instant = std::time::Instant::now();
let mutex: Arc<Mutex<()>> = Arc::new(Mutex::new(()));
let mutex_guard = mutex.lock().await;
let mut futs = Vec::new();
for _ in 0..n {
futs.push(do_task(mutex.clone()));
}
let mut futs_unordered: FuturesUnordered<_> = futs.into_iter().collect();
std::mem::drop(mutex_guard);
for _ in 0..n {
futs_unordered.select_next_some().await;
}
println!("n: {}, time: {}ms", n, start_time.elapsed().as_millis());
}
#[tokio::main]
async fn main() {
for n in [10_000, 20_000, 40_000, 80_000, 160_000] {
benchmark(n).await;
}
}
Cargo.toml
:
[package]
name = "futures-unordered-slow"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "=1.14.0", features = ["rt-multi-thread", "sync", "macros"] }
futures = "=0.3.18"
Doing this using tokio::spawn
makes the time linear again:
main.rs
use std::sync::Arc;
use tokio::sync::Mutex;
async fn do_task(mutex: Arc<Mutex<()>>) {
mutex.lock().await;
}
async fn benchmark(n: usize) {
let start_time: std::time::Instant = std::time::Instant::now();
let mutex: Arc<Mutex<()>> = Arc::new(Mutex::new(()));
let mutex_guard = mutex.lock().await;
let mut futs = Vec::new();
for _ in 0..n {
futs.push(tokio::spawn(do_task(mutex.clone())));
}
std::mem::drop(mutex_guard);
for f in futs {
f.await.unwrap();
}
println!("n: {}, time: {}ms", n, start_time.elapsed().as_millis());
}
#[tokio::main]
async fn main() {
for n in [10_000, 20_000, 40_000, 80_000, 160_000] {
benchmark(n).await;
}
}
n: 10000, time: 6ms
n: 20000, time: 12ms
n: 40000, time: 25ms
n: 80000, time: 52ms
n: 160000, time: 104ms