Skip to content

Quadratic complexity in FuturesUnordered #2526

Closed
@cvybhu

Description

@cvybhu

There might be a performance bug in FuturesUnordered.

Here's an example, which puts n async futures in FuturesUnordered and waits for all of them to finish.
Each task locks an async mutex and immediately frees it, which causes them to finish one by one.

It looks like the execution time is quadratic, each time n increases 2x, the time increases 4x.

n: 10000, time: 23ms
n: 20000, time: 89ms
n: 40000, time: 338ms
n: 80000, time: 1304ms
n: 160000, time: 5181ms

Documentation states:

Futures managed by FuturesUnordered will only be polled when they generate wake-up notifications.

and the implementation tries hard to avoid quadratic complexity, so it looks like a bug.

We ran into this issue when trying to perform many concurrent database queries In scylladb/scylla-rust-driver#362

main.rs:

use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
use std::sync::Arc;
use tokio::sync::Mutex;

async fn do_task(mutex: Arc<Mutex<()>>) {
    mutex.lock().await;
}

async fn benchmark(n: usize) {
    let start_time: std::time::Instant = std::time::Instant::now();

    let mutex: Arc<Mutex<()>> = Arc::new(Mutex::new(()));
    let mutex_guard = mutex.lock().await;

    let mut futs = Vec::new();
    for _ in 0..n {
        futs.push(do_task(mutex.clone()));
    }
    let mut futs_unordered: FuturesUnordered<_> = futs.into_iter().collect();

    std::mem::drop(mutex_guard);

    for _ in 0..n {
        futs_unordered.select_next_some().await;
    }

    println!("n: {}, time: {}ms", n, start_time.elapsed().as_millis());
}

#[tokio::main]
async fn main() {
    for n in [10_000, 20_000, 40_000, 80_000, 160_000] {
        benchmark(n).await;
    }
}

Cargo.toml:

[package]
name = "futures-unordered-slow"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = { version = "=1.14.0", features = ["rt-multi-thread", "sync", "macros"] }
futures = "=0.3.18"

Doing this using tokio::spawn makes the time linear again:
main.rs

use std::sync::Arc;
use tokio::sync::Mutex;

async fn do_task(mutex: Arc<Mutex<()>>) {
    mutex.lock().await;
}

async fn benchmark(n: usize) {
    let start_time: std::time::Instant = std::time::Instant::now();

    let mutex: Arc<Mutex<()>> = Arc::new(Mutex::new(()));
    let mutex_guard = mutex.lock().await;

    let mut futs = Vec::new();
    for _ in 0..n {
        futs.push(tokio::spawn(do_task(mutex.clone())));
    }

    std::mem::drop(mutex_guard);

    for f in futs {
        f.await.unwrap();
    }

    println!("n: {}, time: {}ms", n, start_time.elapsed().as_millis());
}

#[tokio::main]
async fn main() {
    for n in [10_000, 20_000, 40_000, 80_000, 160_000] {
        benchmark(n).await;
    }
}
n: 10000, time: 6ms
n: 20000, time: 12ms
n: 40000, time: 25ms
n: 80000, time: 52ms
n: 160000, time: 104ms

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions