Skip to content

Commit 2f4681e

Browse files
committed
[stdlib]: Propagate AsyncStream termination to all consumers
As of the changes in #41713 to enable Sendability for AsyncStream, it has been possible to create multiple stream consumers operating concurrently. This change fixes behavior in the case that the underlying stream is terminated while multiple pending continuations are outstanding. Previously such consumers would have been leaked (never resumed). Now, they are notified of the stream's termination and resumed appropriately. Resolves #66541 & #71412
1 parent ffba6d1 commit 2f4681e

File tree

2 files changed

+44
-17
lines changed

2 files changed

+44
-17
lines changed

stdlib/public/Concurrency/AsyncStreamBuffer.swift

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -212,25 +212,21 @@ extension AsyncStream {
212212
state.onTermination = nil
213213
state.terminal = true
214214

215-
if let continuation = state.continuations.first {
216-
if state.pending.count > 0 {
217-
state.continuations.removeFirst()
218-
let toSend = state.pending.removeFirst()
219-
unlock()
220-
handler?(.finished)
221-
continuation.resume(returning: toSend)
222-
} else if state.terminal {
223-
state.continuations.removeFirst()
224-
unlock()
225-
handler?(.finished)
226-
continuation.resume(returning: nil)
227-
} else {
228-
unlock()
229-
handler?(.finished)
230-
}
231-
} else {
215+
guard !state.continuations.isEmpty else {
232216
unlock()
233217
handler?(.finished)
218+
return
219+
}
220+
221+
// Hold on to the continuations to resume outside the lock.
222+
let continuations = state.continuations
223+
state.continuations = []
224+
225+
unlock()
226+
handler?(.finished)
227+
228+
for continuation in continuations {
229+
continuation.resume(returning: nil)
234230
}
235231
}
236232

test/Concurrency/Runtime/async_stream.swift

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,37 @@ class NotSendable {}
435435
expectTrue(expectation.fulfilled)
436436
}
437437

438+
// MARK: - Multiple consumers
439+
440+
tests.test("finish behavior with multiple consumers") {
441+
let (stream, continuation) = AsyncStream<Int>.makeStream()
442+
let (controlStream, controlContinuation) = AsyncStream<Int>.makeStream()
443+
var controlIterator = controlStream.makeAsyncIterator()
444+
445+
func makeConsumingTaskWithIndex(_ index: Int) -> Task<Void, Never> {
446+
Task {
447+
controlContinuation.yield(index)
448+
for await i in stream {
449+
controlContinuation.yield(i)
450+
}
451+
}
452+
}
453+
454+
// Wait for the consumers to be set up
455+
let consumer1 = makeConsumingTaskWithIndex(1)
456+
expectEqual(await controlIterator.next(isolation: #isolation), 1)
457+
458+
let consumer2 = makeConsumingTaskWithIndex(2)
459+
expectEqual(await controlIterator.next(isolation: #isolation), 2)
460+
461+
// Terminate the stream
462+
continuation.finish()
463+
464+
// Ensure the consuming Tasks both complete
465+
_ = await consumer1.value
466+
_ = await consumer2.value
467+
}
468+
438469
await runAllTestsAsync()
439470
}
440471
}

0 commit comments

Comments
 (0)