-
Notifications
You must be signed in to change notification settings - Fork 10.5k
[stdlib]: Propagate AsyncStream termination to all consumers #75878
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @phausler, @FranzBusch – if you get some time, i'd appreciate your thoughts on the direction taken here to address this issue. |
9f469e8
to
87daac5
Compare
guard !state.continuations.isEmpty else { | ||
unlock() | ||
handler?(.finished) | ||
return | ||
} | ||
|
||
// Get elements to deliver to outstanding continuations. | ||
let upperBound = Swift.min(state.continuations.count, state.pending.count) | ||
let elementsRange = 0..<upperBound | ||
let elementsToSend = state.pending[elementsRange] | ||
state.pending.removeSubrange(elementsRange) | ||
|
||
// Hold on to the continuations to resume outside the lock. | ||
let continuations = state.continuations | ||
state.continuations = [] | ||
|
||
unlock() | ||
handler?(.finished) | ||
|
||
for (idx, continuation) in continuations.enumerated() { | ||
if idx < elementsToSend.count { | ||
continuation.resume(returning: elementsToSend[idx]) | ||
} else { | ||
continuation.resume(returning: nil) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we have to do anything with pending elements here. If we have a continuation then we must not have any elements buffered. That's a precondition in our internal state. I would be surprised if that can ever happen. I think we just need to loop through all stored continuations and resume them with nil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, thanks for pointing that out – not having to perform this calculation should make things simpler. since i was reasoning locally from the existing implementation, it appeared that case was previously handled, but sounds like that was perhaps an unreachable branch? do you think an actual assertion/precondition of some form is warranted to enforce this invariant (or maybe that's already done and i've overlooked it)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code here is reasonable complex. Before we assume that this branch is unreachable we should do at least a code read to reason through it. If it truly is unreachable I am happy to put a precondition here since it is only on the finish path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
your assessment seems correct to me. the only place where continuations are added is in next(...)
, and the invariant holds there. the only place pending elements are added is yield(...)
, and the logic there has 2 cases:
- if we have stored continuations:
- by the end of this branch, the total number of continuations will decrease by 1, while the number of stored pending elements will remain unchanged
- if we have no stored continuations:
- by the end of this branch, the total number of continuations is unchanged (stays at 0), and we will have added at most 1 new pending element (if the stream isn't terminated)
so i think we can sort of inductively conclude the invariant always holds.
as a consequence, it seems to me the logic in the yield(...)
method can also likely be simplified, as it appears to contain a dead code path, given this observation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just want to point out that this is again a diverge between AsyncStream
and AsyncThrowingStream
. Now this here fixes a bug in the current divergence but it is something we should tackle at some point
i agree, though fixing this bug and unifying the implementations (and updating the docs) seem like reasonably distinct tasks. |
2f4681e
to
4ff5270
Compare
As of the changes in swiftlang#41713 to enable Sendability for AsyncStream, it has been possible to create multiple stream consumers operating concurrently. This change fixes behavior in the case that the underlying stream is terminated while multiple pending continuations are outstanding. Previously such consumers would have been leaked (never resumed). Now, they are notified of the stream's termination and resumed appropriately. Resolves swiftlang#66541 & swiftlang#71412
4ff5270
to
3bea96a
Compare
@FranzBusch @ktoso – any feedback on this? thanks in advance! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! @ktoso are you happy to merge this?
@swift-ci please smoke test |
@swift-ci test windows |
ping @FranzBusch @ktoso – anything else to do here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good, sorry I missed the earlier ping. Thank you for working on this!
As of the changes in #41713 to enable Sendability for AsyncStream, it has been possible to create multiple stream consumers operating concurrently. This change fixes behavior in the case that the underlying stream is terminated while multiple pending continuations are outstanding. Previously such consumers would have been leaked (never resumed). Now, they are notified of the stream's termination and resumed appropriately.
Resolves #66541 & #71412