Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions Guides/Merge.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Merge

[[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/Asyncmerge2Sequence.swift), [Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncMerge3Sequence.swift) |
[Tests](https://github.com/apple/swift-async-algorithms/blob/main/Tests/AsyncAlgorithmsTests/TestMerge.swift)]

Merges two or more asynchronous sequences sharing the same element type into one singular asynchronous sequence.

```swift
let appleFeed = URL("http://www.example.com/ticker?symbol=AAPL").lines.map { "AAPL: " + $0 }
let nasdaqFeed = URL("http://www.example.com/ticker?symbol=^IXIC").lines.map { "^IXIC: " + $0 }

for try await ticker = merge(appleFeed, nasdaqFeed) {
print(ticker)
}
```

Given some sample inputs the following merged events can be expected.

| Timestamp | appleFeed | nasdaqFeed | merged output |
| ----------- | --------- | ---------- | --------------- |
| 11:40 AM | 173.91 | | AAPL: 173.91 |
| 12:25 AM | | 14236.78 | ^IXIC: 14236.78 |
| 12:40 AM | | 14218.34 | ^IXIC: 14218.34 |
| 1:15 PM | 173.00 | | AAPL: 173.00 |

## Detailed Design

This function family and the associated family of return types are prime candidates for variadic generics. Until that proposal is accepted these will be implemented in terms of two and three base sequence cases.

```swift
public func merge<Base1: AsyncSequence, Base2: AsyncSequence>(_ base1: Base1, _ base2: Base2) -> AsyncMerge2Sequence<Base1, Base2>

public func merge<Base1: AsyncSequence, Base2: AsyncSequence, Base3: AsyncSequence>(_ base1: Base1, _ base2: Base2, _ base3: Base3) -> AsyncMerge3Sequence<Base1, Base2, Base3>

public struct AsyncMerge2Sequence<Base1: AsyncSequence, Base2: AsyncSequence>: Sendable
where
Base1.Element == Base2.Element,
Base1: Sendable, Base2: Sendable,
Base1.Element: Sendable, Base2.Element: Sendable,
Base1.AsyncIterator: Sendable, Base2.AsyncIterator: Sendable {
public typealias Element = Base1.Element

public struct Iterator: AsyncIteratorProtocol {
public mutating func next() async rethrows -> Element?
}

public func makeAsyncIterator() -> Iterator
}

public struct AsyncMerge3Sequence<Base1: AsyncSequence, Base2: AsyncSequence, Base3: AsyncSequence>: Sendable
where
Base1.Element == Base2.Element, Base1.Element == Base3.Element,
Base1: Sendable, Base2: Sendable, Base3: Sendable
Base1.Element: Sendable, Base2.Element: Sendable, Base3.Element: Sendable
Base1.AsyncIterator: Sendable, Base2.AsyncIterator: Sendable, Base3.AsyncIterator: Sendable {
public typealias Element = Base1.Element

public struct Iterator: AsyncIteratorProtocol {
public mutating func next() async rethrows -> Element?
}

public func makeAsyncIterator() -> Iterator
}

```

The `merge(_:...)` function takes two or more asynchronous sequences as arguments with the resulting `AsyncMergeSequence` which is an asynchronous sequence.

Since the bases comprising the `AsyncMergeSequence` must be iterated concurrently to produce the latest value it means that those sequences must be able to be sent to child tasks. This means that a prerequisite of the bases must be that the base asynchronous sequences, their iterators, and the elements they produce must be `Sendable`.

When iterating a `AsyncMergeSequence` it becomes terminal when all of the base asynchronous sequences are terminal; meaning there is no more potential of any elements being produced.

The throwing behavior of `AsyncMergeSequence` is that if any of the bases throw then the composed asynchronous sequence then throws on its iteration. If at any point an error is thrown by any base, the other iterations are cancelled and the thrown error is immediately thrown to the consuming iteration.

### Naming

Since the inherent behavior of `merge(_:...)` merges values from multiple streams into a singular asynchronous sequence the naming is intended to be quite literal. There are precedent terms of art in other frameworks and libraries (listed in the comparison section). Other naming takes the form of "withLatestFrom". This was disregarded since the "with" prefix is often most associated with the passing of a closure and some sort of contextual concept; `withUnsafePointer` or `withUnsafeContinuation` are prime examples.

### Comparison with other libraries

**ReactiveX** ReactiveX has an [API definition of Merge](https://reactivex.io/documentation/operators/merge.html) as a top level function for merging Observables.

**Combine** Combine has an [API definition of merge(with:)](https://developer.apple.com/documentation/combine/publisher/merge(with:)-7qt71/) as an operator style method for merging Publishers.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

- [`chain(_:...)`](https://github.com/apple/swift-async-algorithms/blob/main/Guides/Chain.md): Concatenates two or more asynchronous sequences with the same element type.
- [`combineLatest(_:...)`](https://github.com/apple/swift-async-algorithms/blob/main/Guides/CombineLatest.md): Combines two or more asynchronous sequences into an asynchronous sequence producing a tuple of elements from those base asynchronous sequences that updates when any of the base sequences produce a value.
- [`merge(_:...)`](https://github.com/apple/swift-async-algorithms/blob/main/Guides/): Merges two or more asynchronous sequence into a single asynchronous sequence producing the elements of all of the underlying asynchronous sequences.
- [`merge(_:...)`](https://github.com/apple/swift-async-algorithms/blob/main/Guides/Merge.md): Merges two or more asynchronous sequence into a single asynchronous sequence producing the elements of all of the underlying asynchronous sequences.
- [`zip(_:...)`](https://github.com/apple/swift-async-algorithms/blob/main/Guides/): Creates an asynchronous sequence of pairs built out of underlying asynchronous sequences.

#### Creating asynchronous sequences
Expand Down