Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions mea/src/broadcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,17 +377,19 @@ impl<T: Clone> Receiver<T> {
let slot = shared.buffer[idx].read();

if slot.version == head {
if let Some(msg) = &slot.msg {
return if let Some(msg) = &slot.msg {
self.head = head.wrapping_add(1);
return Ok(msg.clone());
}
Ok(msg.clone())
} else {
Err(TryRecvError::Empty)
};
}

drop(slot);

// If version != head, the slot was overwritten.
// This means we lagged, but the `diff > cap` check missed it (likely due to overflow
// wrapping). We treat this as a lag.
drop(slot);

let missed = tail.wrapping_sub(self.head).wrapping_sub(cap);
self.head = tail.wrapping_sub(cap);
return Err(TryRecvError::Lagged(missed));
Expand Down
12 changes: 12 additions & 0 deletions mea/src/broadcast/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,18 @@ async fn test_try_recv_lagged() {
assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
}

#[tokio::test]
async fn test_try_recv_unwritten_slot_is_empty() {
let (tx, mut rx) = channel::<u64>(2);
drop(tx);

// Simulate tail advanced but slot not written yet
rx.shared.tail_cnt.store(1, Ordering::SeqCst);

assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
assert_eq!(rx.head, 0);
}

#[tokio::test]
async fn test_multi_senders_concurrent() {
let (tx, mut rx) = channel(100);
Expand Down