Skip to content

AsyncStream multi-consumptions does not propagate .finished event to all consumers #66541

Closed
@inamiy

Description

@inamiy

Description

As of main branch's 4d1d8a9, AsyncStream's underlying AsyncStream._Storage.State.continuations (array of continuations) management is slightly different from AsyncThrowingStream._Storage.State.continuations (single optional continuation) which doesn't have fatalError("attempt to await next() on more than one task") thus causing the following multi-consumption issue which .finished event does not propagate correctly to all consumers:

@MainActor
func test() async throws {
    let stream = AsyncStream<Int> { cont in
        let task = Task {
            try await Task.sleep(nanoseconds: 1_000_000)
            cont.yield(1)
            cont.finish()
        }
        cont.onTermination = { termination in
            task.cancel()
        }
    }

    var iter = stream.makeAsyncIterator()

    for i in 1 ... 3 {
        Task {
            let value = await iter.next()
            print("===> await \(i) = \(value as Any)")
        }
    }

    try await Task.sleep(nanoseconds: 1_000_000_000)
}

Example console log:

===> await 1 = Optional(1)
===> await 3 = nil

/* NOTE: result varies for each run, but correct 3 awaits will rarerly be called */

The reason behind this is AsyncStream._Storage.finish() is not sending .finished events to all managed continuations but only first one.

However, due to the current nature of unshared AsyncStream by default, it might be simpler to just add fatalError("attempt to await next() on more than one task") for AsyncStream as well.
(Related: apple/swift-async-algorithms#110)

Expected behavior

Either one would be the expected behavior:

  1. Sending .finished events to all managed continuations in AsyncStream._Storage.finish()
  2. Add fatalError("attempt to await next() on more than one task") for AsyncStream to explicitly tell multi-consumption is disallowed.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugA deviation from expected or documented behavior. Also: expected but undesirable behavior.concurrencyFeature: umbrella label for concurrency language features

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions