|
| 1 | +# Merge |
| 2 | + |
| 3 | +[[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/Asyncmerge2Sequence.swift), [Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncMerge3Sequence.swift) | |
| 4 | +[Tests](https://github.com/apple/swift-async-algorithms/blob/main/Tests/AsyncAlgorithmsTests/TestMerge.swift)] |
| 5 | + |
| 6 | +Merges two or more asynchronous sequences sharing the same element type into one singular asynchronous sequence. |
| 7 | + |
| 8 | +```swift |
| 9 | +let appleFeed = URL("http://www.example.com/ticker?symbol=AAPL").lines.map { "AAPL: " + $0 } |
| 10 | +let nasdaqFeed = URL("http://www.example.com/ticker?symbol=^IXIC").lines.map { "^IXIC: " + $0 } |
| 11 | + |
| 12 | +for try await ticker = merge(appleFeed, nasdaqFeed) { |
| 13 | + print(ticker) |
| 14 | +} |
| 15 | +``` |
| 16 | + |
| 17 | +Given some sample inputs the following merged events can be expected. |
| 18 | + |
| 19 | +| Timestamp | appleFeed | nasdaqFeed | merged output | |
| 20 | +| ----------- | --------- | ---------- | --------------- | |
| 21 | +| 11:40 AM | 173.91 | | AAPL: 173.91 | |
| 22 | +| 12:25 AM | | 14236.78 | ^IXIC: 14236.78 | |
| 23 | +| 12:40 AM | | 14218.34 | ^IXIC: 14218.34 | |
| 24 | +| 1:15 PM | 173.00 | | AAPL: 173.00 | |
| 25 | + |
| 26 | +## Detailed Design |
| 27 | + |
| 28 | +This function family and the associated family of return types are prime candidates for variadic generics. Until that proposal is accepted these will be implemented in terms of two and three base sequence cases. |
| 29 | + |
| 30 | +```swift |
| 31 | +public func merge<Base1: AsyncSequence, Base2: AsyncSequence>(_ base1: Base1, _ base2: Base2) -> AsyncMerge2Sequence<Base1, Base2> |
| 32 | + |
| 33 | +public func merge<Base1: AsyncSequence, Base2: AsyncSequence, Base3: AsyncSequence>(_ base1: Base1, _ base2: Base2, _ base3: Base3) -> AsyncMerge3Sequence<Base1, Base2, Base3> |
| 34 | + |
| 35 | +public struct AsyncMerge2Sequence<Base1: AsyncSequence, Base2: AsyncSequence>: Sendable |
| 36 | + where |
| 37 | + Base1.Element == Base2.Element, |
| 38 | + Base1: Sendable, Base2: Sendable, |
| 39 | + Base1.Element: Sendable, Base2.Element: Sendable, |
| 40 | + Base1.AsyncIterator: Sendable, Base2.AsyncIterator: Sendable { |
| 41 | + public typealias Element = Base1.Element |
| 42 | + |
| 43 | + public struct Iterator: AsyncIteratorProtocol { |
| 44 | + public mutating func next() async rethrows -> Element? |
| 45 | + } |
| 46 | + |
| 47 | + public func makeAsyncIterator() -> Iterator |
| 48 | +} |
| 49 | + |
| 50 | +public struct AsyncMerge3Sequence<Base1: AsyncSequence, Base2: AsyncSequence, Base3: AsyncSequence>: Sendable |
| 51 | + where |
| 52 | + Base1.Element == Base2.Element, Base1.Element == Base3.Element, |
| 53 | + Base1: Sendable, Base2: Sendable, Base3: Sendable |
| 54 | + Base1.Element: Sendable, Base2.Element: Sendable, Base3.Element: Sendable |
| 55 | + Base1.AsyncIterator: Sendable, Base2.AsyncIterator: Sendable, Base3.AsyncIterator: Sendable { |
| 56 | + public typealias Element = Base1.Element |
| 57 | + |
| 58 | + public struct Iterator: AsyncIteratorProtocol { |
| 59 | + public mutating func next() async rethrows -> Element? |
| 60 | + } |
| 61 | + |
| 62 | + public func makeAsyncIterator() -> Iterator |
| 63 | +} |
| 64 | + |
| 65 | +``` |
| 66 | + |
| 67 | +The `merge(_:...)` function takes two or more asynchronous sequences as arguments with the resulting `AsyncMergeSequence` which is an asynchronous sequence. |
| 68 | + |
| 69 | +Since the bases comprising the `AsyncMergeSequence` must be iterated concurrently to produce the latest value it means that those sequences must be able to be sent to child tasks. This means that a prerequisite of the bases must be that the base asynchronous sequences, their iterators, and the elements they produce must be `Sendable`. |
| 70 | + |
| 71 | +When iterating a `AsyncMergeSequence` it becomes terminal when all of the base asynchronous sequences are terminal; meaning there is no more potential of any elements being produced. |
| 72 | + |
| 73 | +The throwing behavior of `AsyncMergeSequence` is that if any of the bases throw then the composed asynchronous sequence then throws on its iteration. If at any point an error is thrown by any base, the other iterations are cancelled and the thrown error is immediately thrown to the consuming iteration. |
| 74 | + |
| 75 | +### Naming |
| 76 | + |
| 77 | +Since the inherent behavior of `merge(_:...)` merges values from multiple streams into a singular asynchronous sequence the naming is intended to be quite literal. There are precedent terms of art in other frameworks and libraries (listed in the comparison section). Other naming takes the form of "withLatestFrom". This was disregarded since the "with" prefix is often most associated with the passing of a closure and some sort of contextual concept; `withUnsafePointer` or `withUnsafeContinuation` are prime examples. |
| 78 | + |
| 79 | +### Comparison with other libraries |
| 80 | + |
| 81 | +**ReactiveX** ReactiveX has an [API definition of Merge](https://reactivex.io/documentation/operators/merge.html) as a top level function for merging Observables. |
| 82 | + |
| 83 | +**Combine** Combine has an [API definition of merge(with:)](https://developer.apple.com/documentation/combine/publisher/merge(with:)-7qt71/) as an operator style method for merging Publishers. |
0 commit comments