Skip to content

Enable Sendability for AsyncStream and AsyncThrowingStream #41713

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions stdlib/public/Concurrency/AsyncStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -428,3 +428,6 @@ extension AsyncStream.Continuation {
return storage.yield(())
}
}

@available(SwiftStdlib 5.1, *)
extension AsyncStream: @unchecked Sendable where Element: Sendable { }
46 changes: 22 additions & 24 deletions stdlib/public/Concurrency/AsyncStreamBuffer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import Swift
import Darwin

func _lockWordCount() -> Int {
let sz =
let sz =
MemoryLayout<os_unfair_lock>.size / MemoryLayout<UnsafeRawPointer>.size
return max(sz, 1)
}
Expand Down Expand Up @@ -57,7 +57,7 @@ extension AsyncStream {
typealias TerminationHandler = @Sendable (Continuation.Termination) -> Void

struct State {
var continuation: UnsafeContinuation<Element?, Never>?
var continuations = [UnsafeContinuation<Element?, Never>]()
var pending = _Deque<Element>()
let limit: Continuation.BufferingPolicy
var onTermination: TerminationHandler?
Expand Down Expand Up @@ -105,7 +105,7 @@ extension AsyncStream {
}
}

func cancel() {
@Sendable func cancel() {
lock()
// swap out the handler before we invoke it to prevent double cancel
let handler = state.onTermination
Expand All @@ -123,7 +123,9 @@ extension AsyncStream {
lock()
let limit = state.limit
let count = state.pending.count
if let continuation = state.continuation {

if !state.continuations.isEmpty {
let continuation = state.continuations.removeFirst()
if count > 0 {
if !state.terminal {
switch limit {
Expand Down Expand Up @@ -151,17 +153,14 @@ extension AsyncStream {
} else {
result = .terminated
}
state.continuation = nil
let toSend = state.pending.removeFirst()
unlock()
continuation.resume(returning: toSend)
} else if state.terminal {
state.continuation = nil
result = .terminated
unlock()
continuation.resume(returning: nil)
} else {
state.continuation = nil
switch limit {
case .unbounded:
result = .enqueued(remaining: .max)
Expand Down Expand Up @@ -212,15 +211,15 @@ extension AsyncStream {
state.onTermination = nil
state.terminal = true

if let continuation = state.continuation {
if let continuation = state.continuations.first {
if state.pending.count > 0 {
state.continuation = nil
state.continuations.removeFirst()
let toSend = state.pending.removeFirst()
unlock()
handler?(.finished)
continuation.resume(returning: toSend)
} else if state.terminal {
state.continuation = nil
state.continuations.removeFirst()
unlock()
handler?(.finished)
continuation.resume(returning: nil)
Expand All @@ -236,22 +235,20 @@ extension AsyncStream {

func next(_ continuation: UnsafeContinuation<Element?, Never>) {
lock()
if state.continuation == nil {
if state.pending.count > 0 {
let toSend = state.pending.removeFirst()
unlock()
continuation.resume(returning: toSend)
} else if state.terminal {
unlock()
continuation.resume(returning: nil)
} else {
state.continuation = continuation
unlock()
}
state.continuations.append(continuation)
if state.pending.count > 0 {
let cont = state.continuations.removeFirst()
let toSend = state.pending.removeFirst()
unlock()
cont.resume(returning: toSend)
} else if state.terminal {
let cont = state.continuations.removeFirst()
unlock()
cont.resume(returning: nil)
} else {
unlock()
fatalError("attempt to await next() on more than one task")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@phausler apologies for resurrecting this, but if you have a chance, could you help me understand why this check was removed? i assume it was not directly related to the conditional Sendable conformance since the same change wasn't made to the throwing variant, but maybe i'm overlooking something.

Copy link

@malhal malhal Jan 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@phausler I'd like the fatalError back please.

}

}

func next() async -> Element? {
Expand Down Expand Up @@ -341,7 +338,7 @@ extension AsyncThrowingStream {
}
}

func cancel() {
@Sendable func cancel() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that one necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so since it is used in the termination which is sendable

lock()
// swap out the handler before we invoke it to prevent double cancel
let handler = state.onTermination
Expand Down Expand Up @@ -595,3 +592,4 @@ final class _AsyncStreamCriticalStorage<Contents>: @unchecked Sendable {
return storage
}
}

3 changes: 3 additions & 0 deletions stdlib/public/Concurrency/AsyncThrowingStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -471,3 +471,6 @@ extension AsyncThrowingStream.Continuation {
storage.yield(())
}
}

@available(SwiftStdlib 5.1, *)
extension AsyncThrowingStream: @unchecked Sendable where Element: Sendable { }