-
-
Notifications
You must be signed in to change notification settings - Fork 12
fix(broadcast): fix and test try_recv #94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR fixes the broadcast::Receiver::try_recv method to correctly handle the race condition where the tail counter has been advanced but the slot has not yet been written. The fix uses wrapping arithmetic on version differences to distinguish between genuinely lagged slots, unwritten slots, and slots with matching versions.
Key changes:
- Refactored version comparison logic to use
version_diff = head.wrapping_sub(slot.version)for proper wrapping arithmetic - Added check for unwritten slots (version matches but msg is None) to return Empty instead of incorrectly reporting Lagged
- Added re-read of tail when slot version is newer than expected to confirm actual lag condition
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| mea/src/broadcast/mod.rs | Refactored try_recv logic to use wrapping version differences with three branches: exact match, older version, and newer version; added tail re-read for confirming lag |
| mea/src/broadcast/tests.rs | Added test case to verify that when tail is advanced but slot is unwritten, try_recv returns Empty rather than Lagged |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
tisonkun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we fix this by adding:
if slot.version == head {
return if let Some(msg) = &slot.msg {
self.head = head.wrapping_add(1);
Ok(msg.clone())
} else {
Err(TryRecvError::Empty)
}
}?
Yeah. This is possible and helps in improving |
|
That said, tokio's broadcast uses a Mutex to lock both tail_cnt and the waiter list. So if we find our own But after one more consider I think our impl is reasonable anyway so we can try to improve it first. |
d7f8ad3 to
36b7ce9
Compare
36b7ce9 to
1f2bc19
Compare
I' ve updated the fix accordingly and re-committed the changes based on your proposal. I also fixed a small typo (resv) and a minor whitespace issue in the documentation in the process. |
tisonkun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Merging and releasing ...
Well. I’ll look into improving WaitSet deregistration in the future and propose it as a separate |
Following the comments in PR #90, change the logic in
broadcast::Receiver::try_recvto be based on the wrapping version difference (version_diff = head.wrapping_sub(slot.version)). If the version matches and a message is present, return it normally. If the slot version is logically newer than head, re-read the latest tail and only report Lagged after confirming that the slot has indeed been overwritten.The slot version is initialized to 0, and combined withmsg == Noneto indicate thenot ready yetstate.Add tests to verify that when tail is advanced but the slot has not finished writing,
try_recvreturns Empty instead of incorrectly reporting Lagged.By the way, while fixing the broadcast implementation, I realized that if a task is canceled while waiting (for example, due to
timeout(recv)), its Waker may remain in the WaitSet. If the primitive does not get triggered for a long time (e.g. a long-idle broadcast channel, or a barrier that never reaches the final participant), thesezombie wakersmay never be cleaned up. In practice this is usually not a problem because events tend to be triggered frequently, but do we need to consider adjusting theWaitSetAPI for such extreme cases (for example, providing anunregister method and calling it inDrop`)?