Skip to content

[stdlib]: Propagate AsyncStream termination to all consumers #75878

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 11, 2024

Conversation

jamieQ
Copy link
Contributor

@jamieQ jamieQ commented Aug 14, 2024

As of the changes in #41713 to enable Sendability for AsyncStream, it has been possible to create multiple stream consumers operating concurrently. This change fixes behavior in the case that the underlying stream is terminated while multiple pending continuations are outstanding. Previously such consumers would have been leaked (never resumed). Now, they are notified of the stream's termination and resumed appropriately.

Resolves #66541 & #71412

@jamieQ
Copy link
Contributor Author

jamieQ commented Aug 14, 2024

cc @phausler, @FranzBusch – if you get some time, i'd appreciate your thoughts on the direction taken here to address this issue.

@jamieQ jamieQ force-pushed the stream-termination branch from 9f469e8 to 87daac5 Compare August 14, 2024 12:59
Comment on lines 215 to 239
guard !state.continuations.isEmpty else {
unlock()
handler?(.finished)
return
}

// Get elements to deliver to outstanding continuations.
let upperBound = Swift.min(state.continuations.count, state.pending.count)
let elementsRange = 0..<upperBound
let elementsToSend = state.pending[elementsRange]
state.pending.removeSubrange(elementsRange)

// Hold on to the continuations to resume outside the lock.
let continuations = state.continuations
state.continuations = []

unlock()
handler?(.finished)

for (idx, continuation) in continuations.enumerated() {
if idx < elementsToSend.count {
continuation.resume(returning: elementsToSend[idx])
} else {
continuation.resume(returning: nil)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we have to do anything with pending elements here. If we have a continuation then we must not have any elements buffered. That's a precondition in our internal state. I would be surprised if that can ever happen. I think we just need to loop through all stored continuations and resume them with nil

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, thanks for pointing that out – not having to perform this calculation should make things simpler. since i was reasoning locally from the existing implementation, it appeared that case was previously handled, but sounds like that was perhaps an unreachable branch? do you think an actual assertion/precondition of some form is warranted to enforce this invariant (or maybe that's already done and i've overlooked it)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code here is reasonable complex. Before we assume that this branch is unreachable we should do at least a code read to reason through it. If it truly is unreachable I am happy to put a precondition here since it is only on the finish path.

Copy link
Contributor Author

@jamieQ jamieQ Aug 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

your assessment seems correct to me. the only place where continuations are added is in next(...), and the invariant holds there. the only place pending elements are added is yield(...), and the logic there has 2 cases:

  1. if we have stored continuations:
    • by the end of this branch, the total number of continuations will decrease by 1, while the number of stored pending elements will remain unchanged
  2. if we have no stored continuations:
    • by the end of this branch, the total number of continuations is unchanged (stays at 0), and we will have added at most 1 new pending element (if the stream isn't terminated)

so i think we can sort of inductively conclude the invariant always holds.

as a consequence, it seems to me the logic in the yield(...) method can also likely be simplified, as it appears to contain a dead code path, given this observation.

Copy link
Member

@FranzBusch FranzBusch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to point out that this is again a diverge between AsyncStream and AsyncThrowingStream. Now this here fixes a bug in the current divergence but it is something we should tackle at some point

@jamieQ
Copy link
Contributor Author

jamieQ commented Aug 14, 2024

I just want to point out that this is again a diverge between AsyncStream and AsyncThrowingStream. Now this here fixes a bug in the current divergence but it is something we should tackle at some point

i agree, though fixing this bug and unifying the implementations (and updating the docs) seem like reasonably distinct tasks.

@jamieQ jamieQ force-pushed the stream-termination branch 2 times, most recently from 2f4681e to 4ff5270 Compare August 17, 2024 04:49
As of the changes in swiftlang#41713 to
enable Sendability for AsyncStream, it has been possible to create
multiple stream consumers operating concurrently. This change fixes
behavior in the case that the underlying stream is terminated while
multiple pending continuations are outstanding. Previously such
consumers would have been leaked (never resumed). Now, they are notified
of the stream's termination and resumed appropriately.

Resolves swiftlang#66541 & swiftlang#71412
@jamieQ jamieQ force-pushed the stream-termination branch from 4ff5270 to 3bea96a Compare August 20, 2024 12:26
@jamieQ jamieQ marked this pull request as ready for review August 20, 2024 12:44
@jamieQ jamieQ requested a review from ktoso as a code owner August 20, 2024 12:44
@jamieQ
Copy link
Contributor Author

jamieQ commented Aug 27, 2024

@FranzBusch @ktoso – any feedback on this? thanks in advance!

Copy link
Member

@FranzBusch FranzBusch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! @ktoso are you happy to merge this?

@FranzBusch
Copy link
Member

@swift-ci please smoke test

@FranzBusch
Copy link
Member

@swift-ci test windows

@jamieQ
Copy link
Contributor Author

jamieQ commented Sep 11, 2024

ping @FranzBusch @ktoso – anything else to do here?

Copy link
Contributor

@ktoso ktoso left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good, sorry I missed the earlier ping. Thank you for working on this!

@ktoso ktoso merged commit 4aaf471 into swiftlang:main Sep 11, 2024
3 checks passed
@jamieQ jamieQ deleted the stream-termination branch September 12, 2024 11:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

AsyncStream multi-consumptions does not propagate .finished event to all consumers
3 participants