Description
Description
As of main branch's 4d1d8a9, AsyncStream
's underlying AsyncStream._Storage.State.continuations
(array of continuations) management is slightly different from AsyncThrowingStream._Storage.State.continuations
(single optional continuation) which doesn't have fatalError("attempt to await next() on more than one task")
thus causing the following multi-consumption issue which .finished
event does not propagate correctly to all consumers:
@MainActor
func test() async throws {
let stream = AsyncStream<Int> { cont in
let task = Task {
try await Task.sleep(nanoseconds: 1_000_000)
cont.yield(1)
cont.finish()
}
cont.onTermination = { termination in
task.cancel()
}
}
var iter = stream.makeAsyncIterator()
for i in 1 ... 3 {
Task {
let value = await iter.next()
print("===> await \(i) = \(value as Any)")
}
}
try await Task.sleep(nanoseconds: 1_000_000_000)
}
Example console log:
===> await 1 = Optional(1)
===> await 3 = nil
/* NOTE: result varies for each run, but correct 3 awaits will rarerly be called */
The reason behind this is AsyncStream._Storage.finish()
is not sending .finished
events to all managed continuations but only first one.
However, due to the current nature of unshared AsyncStream by default, it might be simpler to just add fatalError("attempt to await next() on more than one task")
for AsyncStream
as well.
(Related: apple/swift-async-algorithms#110)
Expected behavior
Either one would be the expected behavior:
- Sending
.finished
events to all managed continuations inAsyncStream._Storage.finish()
- Add
fatalError("attempt to await next() on more than one task")
forAsyncStream
to explicitly tell multi-consumption is disallowed.