Description
Version
tokio v1.24.2
Platform
Linux x1 5.15.0-58-generic #64~20.04.1-Ubuntu SMP Fri Jan 6 16:42:31 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux
Description
Hi,
I encountered a performance issue in tokio::sync::watch
. The usage pattern is something like:
- there is a large number of networked subscribers
- something happens centrally (reception of message from upstream)
- that something is broadcast around using a
watch
channel - every subscriber has a little task running, which waits for changes on the channel, takes the latest value, does some subscriber-specific processing, sends out a message, and goes back to waiting
We use watch
here instead of broadcast
because we care only about the latest value, and this is an elegant way to deal with slow clients. I would think is a fairly typical use case for watch channel.
While doing some benchmarking, I saw a massive decay in throughput (actually received messages) moving from flavor="current_thread"
to multi-thread. Also, I expected high CPU usage due to parallel per-subscriber processing. Instead, CPU stayed around 130-180% on a 8-vcore machine.
I have sort of reduced it to this example program:
use std::time::Duration;
use tokio::sync::watch;
use tokio::time::sleep;
#[tokio::main]
async fn main() {
let (snd, _) = watch::channel(0i32);
for _ in 0..1000 {
let mut rcv = snd.subscribe();
tokio::spawn(async move {
loop {
if rcv.changed().await.is_err() {
break;
}
// read lock
let _ = *rcv.borrow();
}
});
}
for i in 0..1000 {
sleep(Duration::from_millis(1)).await;
let _ = snd.send(i);
}
}
Not much happens here, yet running this reports almost 6 seconds of CPU time used, of which 1.6 system time. Changing to #[tokio::main(flavor="current_thread")]
changes that to hardly anything (0.35s user time, 0.05s system). Also, running with strace
shows a lot of futex
calls.
So apparently there's a lot of lock contention. Digging into it, I think (well, reasonably sure) that the following happens.
- calling
send
on thewatch::Sender
translates toNotify::notify_waiters
; this by itself is fine, it sets all waiting tasks as runnable without locks held - tasks start running, and worker threads start stealing some
- multiple threads are now processing the subscription tasks, and since they're fairly short, many end around the same time
- at the end of the task loop, the tasks will go back to waiting on the same channel, which requires re-registering with the
Notify
- the waiter list inside the
Notify
is behind a mutex, and this is where the contention is.
If we accept that this is indeed a fairly typical use case, then it could, I think, be solved by making a different trade-off between send/receive performance and clone/subscribe/drop overhead:
- give each receiver a unique 64-bit ID, allocated from an
AtomicU64
in the shared state; - each receiver also has a
Arc<AtomicWaker>
; - inside shared state, instead of a notifier, maintain a
Mutex<HashMap<u64, Arc<AtomicWaker>>>
, which is accessed inSender::send
,Sender::subscribe
,Receiver::clone
andReceiver::drop
only, so not inReceiver::changed
.
Sender code is then something like:
... // update value
... // update version
let wakers = inner
.subscriptions
.lock()
.values()
.filter_map(|w| w.take())
.collect::<Vec<_>>(); // FIXME reuse vector capacity
for waker in wakers {
waker.wake();
}
Changing my example case above to something like this, reduces the CPU overhead to ~1sec, all of which user time. There is still non-negligible overhead (contention on the rwlock cache line), but no thread stalls.
I did something like this in my production code, which solved the problem. I actually created a special type of notifier, which is another consideration, but the use cases for Notify
are probably more diverse than watch
.