Skip to content

Enable Sendability for AsyncStream and AsyncThrowingStream #41713

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions stdlib/public/Concurrency/AsyncStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -428,3 +428,6 @@ extension AsyncStream.Continuation {
return storage.yield(())
}
}

@available(SwiftStdlib 5.1, *)
extension AsyncStream: @unchecked Sendable where Element: Sendable { }
46 changes: 22 additions & 24 deletions stdlib/public/Concurrency/AsyncStreamBuffer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import Swift
import Darwin

func _lockWordCount() -> Int {
let sz =
let sz =
MemoryLayout<os_unfair_lock>.size / MemoryLayout<UnsafeRawPointer>.size
return max(sz, 1)
}
Expand Down Expand Up @@ -57,7 +57,7 @@ extension AsyncStream {
typealias TerminationHandler = @Sendable (Continuation.Termination) -> Void

struct State {
var continuation: UnsafeContinuation<Element?, Never>?
var continuations = [UnsafeContinuation<Element?, Never>]()
var pending = _Deque<Element>()
let limit: Continuation.BufferingPolicy
var onTermination: TerminationHandler?
Expand Down Expand Up @@ -105,7 +105,7 @@ extension AsyncStream {
}
}

func cancel() {
@Sendable func cancel() {
lock()
// swap out the handler before we invoke it to prevent double cancel
let handler = state.onTermination
Expand All @@ -123,7 +123,9 @@ extension AsyncStream {
lock()
let limit = state.limit
let count = state.pending.count
if let continuation = state.continuation {

if !state.continuations.isEmpty {
let continuation = state.continuations.removeFirst()
if count > 0 {
if !state.terminal {
switch limit {
Expand Down Expand Up @@ -151,17 +153,14 @@ extension AsyncStream {
} else {
result = .terminated
}
state.continuation = nil
let toSend = state.pending.removeFirst()
unlock()
continuation.resume(returning: toSend)
} else if state.terminal {
state.continuation = nil
result = .terminated
unlock()
continuation.resume(returning: nil)
} else {
state.continuation = nil
switch limit {
case .unbounded:
result = .enqueued(remaining: .max)
Expand Down Expand Up @@ -212,15 +211,15 @@ extension AsyncStream {
state.onTermination = nil
state.terminal = true

if let continuation = state.continuation {
if let continuation = state.continuations.first {
if state.pending.count > 0 {
state.continuation = nil
state.continuations.removeFirst()
let toSend = state.pending.removeFirst()
unlock()
handler?(.finished)
continuation.resume(returning: toSend)
} else if state.terminal {
state.continuation = nil
state.continuations.removeFirst()
unlock()
handler?(.finished)
continuation.resume(returning: nil)
Expand All @@ -236,22 +235,20 @@ extension AsyncStream {

func next(_ continuation: UnsafeContinuation<Element?, Never>) {
lock()
if state.continuation == nil {
if state.pending.count > 0 {
let toSend = state.pending.removeFirst()
unlock()
continuation.resume(returning: toSend)
} else if state.terminal {
unlock()
continuation.resume(returning: nil)
} else {
state.continuation = continuation
unlock()
}
state.continuations.append(continuation)
if state.pending.count > 0 {
let cont = state.continuations.removeFirst()
let toSend = state.pending.removeFirst()
unlock()
cont.resume(returning: toSend)
} else if state.terminal {
let cont = state.continuations.removeFirst()
unlock()
cont.resume(returning: nil)
} else {
unlock()
fatalError("attempt to await next() on more than one task")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@phausler apologies for resurrecting this, but if you have a chance, could you help me understand why this check was removed? i assume it was not directly related to the conditional Sendable conformance since the same change wasn't made to the throwing variant, but maybe i'm overlooking something.

Copy link

@malhal malhal Jan 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@phausler I'd like the fatalError back please.

Copy link

@CrownedPhoenix CrownedPhoenix Jun 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a contrary opinion on this.

I would like this same behavior/changeset to be applied to the AsyncThrowingStream._Storage.

I believe that consuming a stream from multiple tasks should not be an error. The current behavior of AsyncStream in that scenario is to distribute the stream elements to each consumer somewhat randomly - which I think is perfectly reasonable behavior.

I don't think fatalError in this circumstance is necessary.

I'm sympathetic to the argument that multi-consumption is a violation of this API and should fail fast to alert the user to a possible bug. But I also believe that multi-consumption ought to be a feature - so if it isn't this API then it deserves to be first-classed in some other way.

Removing a fatalError here seems like a relatively low-risk (?) way to make progress on multi-consumption.

At any rate, AsyncStream and AsyncThrowing stream should definitely have the same behavior in a multi-consumer scenario - and right now they do not.

Copy link

@CrownedPhoenix CrownedPhoenix Jun 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link

@malhal malhal Jun 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multi-consumption means shared mutable state and the tool for that is actor.

Copy link

@CrownedPhoenix CrownedPhoenix Jun 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what that has to do with the fatalError here?

Do you mean that instead of a lock and a check for multiple access, this should just be implemented with an actor? I would agree with that.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just responding to your statement that said multi consumer should be a feature, I just think it's impossible similar to how value types can't be copied and continue to have the same changes.

I mean if you want multi consumption you need to go reference type, and an actor would help with storing the mutable state that is all the clients.

}

}

func next() async -> Element? {
Expand Down Expand Up @@ -341,7 +338,7 @@ extension AsyncThrowingStream {
}
}

func cancel() {
@Sendable func cancel() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that one necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so since it is used in the termination which is sendable

lock()
// swap out the handler before we invoke it to prevent double cancel
let handler = state.onTermination
Expand Down Expand Up @@ -595,3 +592,4 @@ final class _AsyncStreamCriticalStorage<Contents>: @unchecked Sendable {
return storage
}
}

3 changes: 3 additions & 0 deletions stdlib/public/Concurrency/AsyncThrowingStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -471,3 +471,6 @@ extension AsyncThrowingStream.Continuation {
storage.yield(())
}
}

@available(SwiftStdlib 5.1, *)
extension AsyncThrowingStream: @unchecked Sendable where Element: Sendable { }