Skip to content

Conversation

@orthur2
Copy link
Contributor

@orthur2 orthur2 commented Dec 12, 2025

Following the comments in PR #90, change the logic in broadcast::Receiver::try_recv to be based on the wrapping version difference (version_diff = head.wrapping_sub(slot.version)). If the version matches and a message is present, return it normally. If the slot version is logically newer than head, re-read the latest tail and only report Lagged after confirming that the slot has indeed been overwritten.The slot version is initialized to 0, and combined with msg == None to indicate the not ready yet state.

Add tests to verify that when tail is advanced but the slot has not finished writing, try_recv returns Empty instead of incorrectly reporting Lagged.

By the way, while fixing the broadcast implementation, I realized that if a task is canceled while waiting (for example, due to timeout(recv)), its Waker may remain in the WaitSet. If the primitive does not get triggered for a long time (e.g. a long-idle broadcast channel, or a barrier that never reaches the final participant), these zombie wakers may never be cleaned up. In practice this is usually not a problem because events tend to be triggered frequently, but do we need to consider adjusting the WaitSet API for such extreme cases (for example, providing an unregister method and calling it in Drop`)?

@orthur2 orthur2 marked this pull request as ready for review December 12, 2025 09:52
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes the broadcast::Receiver::try_recv method to correctly handle the race condition where the tail counter has been advanced but the slot has not yet been written. The fix uses wrapping arithmetic on version differences to distinguish between genuinely lagged slots, unwritten slots, and slots with matching versions.

Key changes:

  • Refactored version comparison logic to use version_diff = head.wrapping_sub(slot.version) for proper wrapping arithmetic
  • Added check for unwritten slots (version matches but msg is None) to return Empty instead of incorrectly reporting Lagged
  • Added re-read of tail when slot version is newer than expected to confirm actual lag condition

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
mea/src/broadcast/mod.rs Refactored try_recv logic to use wrapping version differences with three branches: exact match, older version, and newer version; added tail re-read for confirming lag
mea/src/broadcast/tests.rs Added test case to verify that when tail is advanced but slot is unwritten, try_recv returns Empty rather than Lagged

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Collaborator

@tisonkun tisonkun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we fix this by adding:

            if slot.version == head {
                return if let Some(msg) = &slot.msg {
                    self.head = head.wrapping_add(1);
                    Ok(msg.clone())
                } else {
                    Err(TryRecvError::Empty)
                }
            }

?

@tisonkun
Copy link
Collaborator

tisonkun commented Dec 14, 2025

but do we need to consider adjusting the WaitSet API for such extreme cases

Yeah. This is possible and helps in improving Barrier/WaitGroup/Latch. Previously, I didn't write it because I didn't (and all these waiters are supposed to be finished in a reasonable time). But anyway you can follow what Acquire in internal::semaphore does to do a deregister.

@tisonkun
Copy link
Collaborator

tisonkun commented Dec 14, 2025

That said, tokio's broadcast uses a Mutex to lock both tail_cnt and the waiter list. So if we find our own AtomicU64 + Mutex<WaitSet> to a big Mutex<(u64, WaitSet)> // to be a struct.

But after one more consider I think our impl is reasonable anyway so we can try to improve it first.

@BewareMyPower BewareMyPower changed the title fix(broadcast): fix and test try_resv fix(broadcast): fix and test try_recv Dec 14, 2025
@orthur2
Copy link
Contributor Author

orthur2 commented Dec 17, 2025

Can we fix this by adding:

            if slot.version == head {
                return if let Some(msg) = &slot.msg {
                    self.head = head.wrapping_add(1);
                    Ok(msg.clone())
                } else {
                    Err(TryRecvError::Empty)
                }
            }

?

I' ve updated the fix accordingly and re-committed the changes based on your proposal. I also fixed a small typo (resv) and a minor whitespace issue in the documentation in the process.

Copy link
Collaborator

@tisonkun tisonkun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Merging and releasing ...

@tisonkun tisonkun merged commit 768e9be into fast:main Dec 17, 2025
9 checks passed
@orthur2
Copy link
Contributor Author

orthur2 commented Dec 17, 2025

That said, tokio's broadcast uses a Mutex to lock both tail_cnt and the waiter list. So if we find our own AtomicU64 + Mutex<WaitSet> to a big Mutex<(u64, WaitSet)> // to be a struct.也就是说,tokio 的广播使用互斥锁来锁定 tail_cnt 和等待者列表。因此,如果我们找到自己的 AtomicU64 + Mutex<WaitSet> 到大 Mutex<(u64, WaitSet)> // to be a struct

But after one more consider I think our impl is reasonable anyway so we can try to improve it first.但仔细考虑之后,我认为我们的实现方式总体上是合理的,所以我们可以先尝试改进它。

Well. I’ll look into improving WaitSet deregistration in the future and propose it as a separate

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants