- 
                Notifications
    You must be signed in to change notification settings 
- Fork 185
Description
Hi, I have been reading the documentation for this repository and it left me wondering: is this planned to introduce a feature to allow several tasks to consume the same AsyncSequence?
If this is not the right place to offer ideas, please let me know. I don't think this would fit in a proposal.
Context
I recently had to stream a file using URLSession and two pieces of code were interested in the streamed values. Rather than starting a stream twice which would not be efficient, I wanted to allow the stream to be consumed by several tasks. The data that were already streamed would be sent when a new consumption is set up.
Basic implementation
I tried to implement such a solution (in a gist) that will use a reduce function on the already emitted values and emit the result when a new task starts consuming the sequence. It's far from being perfect but I think it might help understanding the idea.
Implementation
struct ReducedReplayAsyncStream<Element> {
    typealias Reduce = (_ partialResult: inout Element, _ nextResult: Element) -> Void
    private let storage: _Storage
    private var originalStream: AsyncStream<Element>
    init(
        bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded,
        initialResult: Element,
        reduce: @escaping Reduce,
        build: (AsyncStream<Element>.Continuation) -> Void
    ) {
        originalStream = AsyncStream(Element.self, bufferingPolicy: limit, build)
        storage = _Storage(stored: initialResult, reduce: reduce)
    }
    private func makeStream() -> AsyncStream<Element> {
        AsyncStream<Element> { continuation in
            Task {
                var isFirst = false
                if await !storage.didStart {
                    await storage.setDidStart(true)
                    isFirst = true
                    startConsumingOriginalStream()
                }
                if !isFirst {
                    await continuation.yield(storage.stored)
                }
                await storage.appendContinuation(continuation)
            }
        }
    }
    private func startConsumingOriginalStream () {
        Task {
            for await value in originalStream {
                await storage.updateWith(value: value)
            }
            await storage.continuations.forEach { $0.finish() }
        }
    }
}
extension ReducedReplayAsyncStream {
    private actor _Storage {
        private let reduce: ReducedReplayAsyncStream.Reduce
        var didStart = false
        var stored: Element
        var continuations: [AsyncStream<Element>.Continuation] = []
        init(stored: Element, reduce: @escaping Reduce) {
            self.stored = stored
            self.reduce = reduce
        }
        func updateWith(value: Element) {
            reduce(&stored, value)
            continuations.forEach { $0.yield(value) }
        }
        func setDidStart(_ value: Bool) {
            didStart = value
        }
        func appendContinuation(_ continuation: AsyncStream<Element>.Continuation) {
            continuations.append(continuation)
        }
    }
}
extension ReducedReplayAsyncStream: AsyncSequence {
    typealias AsyncIterator = AsyncStream<Element>.AsyncIterator
    func makeAsyncIterator() -> AsyncIterator {
        let stream = makeStream()
        return stream.makeAsyncIterator()
    }
}Usage
var subscriptions: Set<AnyCancellable> = []
var continuation: Stream.Continuation!
let replayStream = ReducedReplayAsyncStream<Int>(
    initialResult: 0,
    reduce: { partialResult, nextResult in partialResult = partialResult + nextResult },
    build: { continuation = $0 }
)
var counter = 0
Timer.publish(every: 0.4, on: .main, in: .default)
    .autoconnect()
    .sink { _ in
        if counter == 10 {
            continuation.finish()
        }
        continuation.yield(counter)
        counter += 1
    }
    .store(in: &subscriptions)
Task {
    for await value in replayStream {
        print("[A]", value)
    }
}
Task {
    try await Task.sleep(nanoseconds: 3_000_000_000)
    for await value in replayStream {
        print("[B]", value)
    }
}Some considerations about efficiency can be found in the gist.