Skip to content

Watch channel with many subscribers has sub-optimal multithreaded performance #5403

Closed
@tijsvd

Description

@tijsvd

Version

tokio v1.24.2

Platform

Linux x1 5.15.0-58-generic #64~20.04.1-Ubuntu SMP Fri Jan 6 16:42:31 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux

Description

Hi,

I encountered a performance issue in tokio::sync::watch. The usage pattern is something like:

  • there is a large number of networked subscribers
  • something happens centrally (reception of message from upstream)
  • that something is broadcast around using a watch channel
  • every subscriber has a little task running, which waits for changes on the channel, takes the latest value, does some subscriber-specific processing, sends out a message, and goes back to waiting

We use watch here instead of broadcast because we care only about the latest value, and this is an elegant way to deal with slow clients. I would think is a fairly typical use case for watch channel.

While doing some benchmarking, I saw a massive decay in throughput (actually received messages) moving from flavor="current_thread" to multi-thread. Also, I expected high CPU usage due to parallel per-subscriber processing. Instead, CPU stayed around 130-180% on a 8-vcore machine.

I have sort of reduced it to this example program:

use std::time::Duration;                                                                                           
use tokio::sync::watch;                                                                                            
use tokio::time::sleep;                                                                                            
                                                                                                                   
#[tokio::main]                                                                                                     
async fn main() {                                                                                                  
    let (snd, _) = watch::channel(0i32);                                                                           
    for _ in 0..1000 {                                                                                             
        let mut rcv = snd.subscribe();                                                                             
        tokio::spawn(async move {                                                                                  
            loop {                                                                                                 
                if rcv.changed().await.is_err() {                                                                  
                    break;                                                                                         
                }                                                                                                  
                // read lock                                                                                       
                let _ = *rcv.borrow();                                                                             
            }                                                                                                      
        });                                                                                                        
    }                                                                                                              
    for i in 0..1000 {                                                                                             
        sleep(Duration::from_millis(1)).await;                                                                     
        let _ = snd.send(i);                                                                                       
    }                                                                                                              
}                                                                                                                  

Not much happens here, yet running this reports almost 6 seconds of CPU time used, of which 1.6 system time. Changing to #[tokio::main(flavor="current_thread")] changes that to hardly anything (0.35s user time, 0.05s system). Also, running with strace shows a lot of futex calls.

So apparently there's a lot of lock contention. Digging into it, I think (well, reasonably sure) that the following happens.

  • calling send on the watch::Sender translates to Notify::notify_waiters; this by itself is fine, it sets all waiting tasks as runnable without locks held
  • tasks start running, and worker threads start stealing some
  • multiple threads are now processing the subscription tasks, and since they're fairly short, many end around the same time
  • at the end of the task loop, the tasks will go back to waiting on the same channel, which requires re-registering with the Notify
  • the waiter list inside the Notify is behind a mutex, and this is where the contention is.

If we accept that this is indeed a fairly typical use case, then it could, I think, be solved by making a different trade-off between send/receive performance and clone/subscribe/drop overhead:

  • give each receiver a unique 64-bit ID, allocated from an AtomicU64 in the shared state;
  • each receiver also has a Arc<AtomicWaker>;
  • inside shared state, instead of a notifier, maintain a Mutex<HashMap<u64, Arc<AtomicWaker>>>, which is accessed in Sender::send, Sender::subscribe, Receiver::clone and Receiver::drop only, so not in Receiver::changed.

Sender code is then something like:

... // update value
... // update version
let wakers = inner
  .subscriptions
  .lock()
  .values()
  .filter_map(|w| w.take())
  .collect::<Vec<_>>(); // FIXME reuse vector capacity
for waker in wakers {
  waker.wake();
}

Changing my example case above to something like this, reduces the CPU overhead to ~1sec, all of which user time. There is still non-negligible overhead (contention on the rwlock cache line), but no thread stalls.

I did something like this in my production code, which solved the problem. I actually created a special type of notifier, which is another consideration, but the use cases for Notify are probably more diverse than watch.

Metadata

Metadata

Assignees

No one assigned

    Labels

    A-tokioArea: The main tokio crateC-bugCategory: This is a bug.M-syncModule: tokio/syncT-performanceTopic: performance and benchmarks

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions