Skip to content

[stdlib]: Propagate AsyncStream termination to all consumers #75878

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 13 additions & 17 deletions stdlib/public/Concurrency/AsyncStreamBuffer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -212,25 +212,21 @@ extension AsyncStream {
state.onTermination = nil
state.terminal = true

if let continuation = state.continuations.first {
if state.pending.count > 0 {
state.continuations.removeFirst()
let toSend = state.pending.removeFirst()
unlock()
handler?(.finished)
continuation.resume(returning: toSend)
} else if state.terminal {
state.continuations.removeFirst()
unlock()
handler?(.finished)
continuation.resume(returning: nil)
} else {
unlock()
handler?(.finished)
}
} else {
guard !state.continuations.isEmpty else {
unlock()
handler?(.finished)
return
}

// Hold on to the continuations to resume outside the lock.
let continuations = state.continuations
state.continuations.removeAll()

unlock()
handler?(.finished)

for continuation in continuations {
continuation.resume(returning: nil)
}
}

Expand Down
34 changes: 34 additions & 0 deletions test/Concurrency/Runtime/async_stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,40 @@ class NotSendable {}
expectTrue(expectation.fulfilled)
}

// MARK: - Multiple consumers

tests.test("finish behavior with multiple consumers") {
let (stream, continuation) = AsyncStream<Int>.makeStream()
let (controlStream, controlContinuation) = AsyncStream<Int>.makeStream()
var controlIterator = controlStream.makeAsyncIterator()

func makeConsumingTaskWithIndex(_ index: Int) -> Task<Void, Never> {
Task { @MainActor in
controlContinuation.yield(index)
for await i in stream {
controlContinuation.yield(i)
}
}
}

// Set up multiple consumers
let consumer1 = makeConsumingTaskWithIndex(1)
expectEqual(await controlIterator.next(isolation: #isolation), 1)

let consumer2 = makeConsumingTaskWithIndex(2)
expectEqual(await controlIterator.next(isolation: #isolation), 2)

// Ensure the iterators are suspended
await MainActor.run {}

// Terminate the stream
continuation.finish()

// Ensure the consuming Tasks both complete
_ = await consumer1.value
_ = await consumer2.value
}

await runAllTestsAsync()
}
}
Expand Down