-
Notifications
You must be signed in to change notification settings - Fork 185
Add MultiProducerSingleConsumerChannel
#305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
FranzBusch
wants to merge
22
commits into
apple:main
Choose a base branch
from
FranzBusch:fb-async-backpressured-stream
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+4,718
−0
Open
Changes from all commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
3e7f2f6
Add `AsyncBackpressuredStream` proposal and implementation
FranzBusch 5fadfc8
Update proposal and implementation
FranzBusch d15c65b
Update proposal
FranzBusch 2a923f0
Add example project
FranzBusch 62ff386
Formatting
FranzBusch 717ee7d
Fix Swift 6.0 build
FranzBusch 4b7c077
Future direction for ~Copyable elements
FranzBusch 5372b3a
Apply formatting
FranzBusch b680d1a
Fix CI
FranzBusch 8744a4e
Move to 6.1 and update proposal
FranzBusch 2ace2a5
Guard tests
FranzBusch 7b2a9b6
Minor edits to the proposal
FranzBusch c7e149b
Fix revision order
FranzBusch 5f98315
FIxup setOnTerminationCallback
FranzBusch 8b1612f
Address review feedback
FranzBusch 7baec74
Rename to `MultiProducerSingleConsumerAsyncChannel`
FranzBusch 6e7a7a8
Allow one termination callback per source.
FranzBusch f4c3a36
Fix all sendable warnings
FranzBusch 786370b
Remove unbounded strategy, rename copy -> makeAdditionalSource, renam…
FranzBusch 91f7e78
Remove Example and fix docs and fix format
FranzBusch 062b6a4
fixes grammar and some word order issues
heckj a68ef61
Merge pull request #3 from heckj/grammar-fixes
FranzBusch File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| root = true | ||
|
|
||
| [*] | ||
| indent_style = space | ||
| indent_size = 2 | ||
| end_of_line = lf | ||
| insert_final_newline = true | ||
| trim_trailing_whitespace = true |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
838 changes: 838 additions & 0 deletions
838
Evolution/0016-mutli-producer-single-consumer-channel.md
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,329 @@ | ||
| //===----------------------------------------------------------------------===// | ||
| // | ||
| // This source file is part of the Swift Async Algorithms open source project | ||
| // | ||
| // Copyright (c) 2023 Apple Inc. and the Swift project authors | ||
| // Licensed under Apache License v2.0 with Runtime Library Exception | ||
| // | ||
| // See https://swift.org/LICENSE.txt for license information | ||
| // | ||
| //===----------------------------------------------------------------------===// | ||
| //===----------------------------------------------------------------------===// | ||
| // | ||
| // This source file is part of the SwiftCertificates open source project | ||
| // | ||
| // Copyright (c) 2023 Apple Inc. and the SwiftCertificates project authors | ||
| // Licensed under Apache License v2.0 | ||
| // | ||
| // See LICENSE.txt for license information | ||
| // See CONTRIBUTORS.txt for the list of SwiftCertificates project authors | ||
| // | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| // | ||
| //===----------------------------------------------------------------------===// | ||
|
|
||
| /// ``_TinyArray`` is a ``RandomAccessCollection`` optimised to store zero or one ``Element``. | ||
| /// It supports arbitrary many elements but if only up to one ``Element`` is stored it does **not** allocate separate storage on the heap | ||
| /// and instead stores the ``Element`` inline. | ||
| @usableFromInline | ||
| struct _TinyArray<Element> { | ||
| @usableFromInline | ||
| enum Storage { | ||
| case one(Element) | ||
| case arbitrary([Element]) | ||
| } | ||
|
|
||
| @usableFromInline | ||
| var storage: Storage | ||
| } | ||
|
|
||
| // MARK: - TinyArray "public" interface | ||
|
|
||
| extension _TinyArray: Equatable where Element: Equatable {} | ||
| extension _TinyArray: Hashable where Element: Hashable {} | ||
| extension _TinyArray: Sendable where Element: Sendable {} | ||
|
|
||
| extension _TinyArray: RandomAccessCollection { | ||
| @usableFromInline | ||
| typealias Element = Element | ||
|
|
||
| @usableFromInline | ||
| typealias Index = Int | ||
|
|
||
| @inlinable | ||
| subscript(position: Int) -> Element { | ||
| get { | ||
| self.storage[position] | ||
| } | ||
| set { | ||
| self.storage[position] = newValue | ||
| } | ||
| } | ||
|
|
||
| @inlinable | ||
| var startIndex: Int { | ||
| self.storage.startIndex | ||
| } | ||
|
|
||
| @inlinable | ||
| var endIndex: Int { | ||
| self.storage.endIndex | ||
| } | ||
| } | ||
|
|
||
| extension _TinyArray { | ||
| @inlinable | ||
| init(_ elements: some Sequence<Element>) { | ||
| self.storage = .init(elements) | ||
| } | ||
|
|
||
| @inlinable | ||
| init() { | ||
| self.storage = .init() | ||
| } | ||
|
|
||
| @inlinable | ||
| mutating func append(_ newElement: Element) { | ||
| self.storage.append(newElement) | ||
| } | ||
|
|
||
| @inlinable | ||
| mutating func append(contentsOf newElements: some Sequence<Element>) { | ||
| self.storage.append(contentsOf: newElements) | ||
| } | ||
|
|
||
| @discardableResult | ||
| @inlinable | ||
| mutating func remove(at index: Int) -> Element { | ||
| self.storage.remove(at: index) | ||
| } | ||
|
|
||
| @inlinable | ||
| mutating func removeAll(where shouldBeRemoved: (Element) throws -> Bool) rethrows { | ||
| try self.storage.removeAll(where: shouldBeRemoved) | ||
| } | ||
|
|
||
| @inlinable | ||
| mutating func sort(by areInIncreasingOrder: (Element, Element) throws -> Bool) rethrows { | ||
| try self.storage.sort(by: areInIncreasingOrder) | ||
| } | ||
| } | ||
|
|
||
| // MARK: - TinyArray.Storage "private" implementation | ||
|
|
||
| extension _TinyArray.Storage: Equatable where Element: Equatable { | ||
| @inlinable | ||
| static func == (lhs: Self, rhs: Self) -> Bool { | ||
| switch (lhs, rhs) { | ||
| case (.one(let lhs), .one(let rhs)): | ||
| return lhs == rhs | ||
| case (.arbitrary(let lhs), .arbitrary(let rhs)): | ||
| // we don't use lhs.elementsEqual(rhs) so we can hit the fast path from Array | ||
| // if both arrays share the same underlying storage: https://github.com/apple/swift/blob/b42019005988b2d13398025883e285a81d323efa/stdlib/public/core/Array.swift#L1775 | ||
| return lhs == rhs | ||
|
|
||
| case (.one(let element), .arbitrary(let array)), | ||
| (.arbitrary(let array), .one(let element)): | ||
| guard array.count == 1 else { | ||
| return false | ||
| } | ||
| return element == array[0] | ||
|
|
||
| } | ||
| } | ||
| } | ||
| extension _TinyArray.Storage: Hashable where Element: Hashable { | ||
| @inlinable | ||
| func hash(into hasher: inout Hasher) { | ||
| // same strategy as Array: https://github.com/apple/swift/blob/b42019005988b2d13398025883e285a81d323efa/stdlib/public/core/Array.swift#L1801 | ||
| hasher.combine(count) | ||
| for element in self { | ||
| hasher.combine(element) | ||
| } | ||
| } | ||
| } | ||
| extension _TinyArray.Storage: Sendable where Element: Sendable {} | ||
|
|
||
| extension _TinyArray.Storage: RandomAccessCollection { | ||
| @inlinable | ||
| subscript(position: Int) -> Element { | ||
| get { | ||
| switch self { | ||
| case .one(let element): | ||
| guard position == 0 else { | ||
| fatalError("index \(position) out of bounds") | ||
| } | ||
| return element | ||
| case .arbitrary(let elements): | ||
| return elements[position] | ||
| } | ||
| } | ||
| set { | ||
| switch self { | ||
| case .one: | ||
| guard position == 0 else { | ||
| fatalError("index \(position) out of bounds") | ||
| } | ||
| self = .one(newValue) | ||
| case .arbitrary(var elements): | ||
| elements[position] = newValue | ||
| self = .arbitrary(elements) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @inlinable | ||
| var startIndex: Int { | ||
| 0 | ||
| } | ||
|
|
||
| @inlinable | ||
| var endIndex: Int { | ||
| switch self { | ||
| case .one: return 1 | ||
| case .arbitrary(let elements): return elements.endIndex | ||
| } | ||
| } | ||
| } | ||
|
|
||
| extension _TinyArray.Storage { | ||
| @inlinable | ||
| init(_ elements: some Sequence<Element>) { | ||
| var iterator = elements.makeIterator() | ||
| guard let firstElement = iterator.next() else { | ||
| self = .arbitrary([]) | ||
| return | ||
| } | ||
| guard let secondElement = iterator.next() else { | ||
| // newElements just contains a single element | ||
| // and we hit the fast path | ||
| self = .one(firstElement) | ||
| return | ||
| } | ||
|
|
||
| var elements: [Element] = [] | ||
| elements.reserveCapacity(elements.underestimatedCount) | ||
| elements.append(firstElement) | ||
| elements.append(secondElement) | ||
| while let nextElement = iterator.next() { | ||
| elements.append(nextElement) | ||
| } | ||
| self = .arbitrary(elements) | ||
| } | ||
|
|
||
| @inlinable | ||
| init() { | ||
| self = .arbitrary([]) | ||
| } | ||
|
|
||
| @inlinable | ||
| mutating func append(_ newElement: Element) { | ||
| self.append(contentsOf: CollectionOfOne(newElement)) | ||
| } | ||
|
|
||
| @inlinable | ||
| mutating func append(contentsOf newElements: some Sequence<Element>) { | ||
| switch self { | ||
| case .one(let firstElement): | ||
| var iterator = newElements.makeIterator() | ||
| guard let secondElement = iterator.next() else { | ||
| // newElements is empty, nothing to do | ||
| return | ||
| } | ||
| var elements: [Element] = [] | ||
| elements.reserveCapacity(1 + newElements.underestimatedCount) | ||
| elements.append(firstElement) | ||
| elements.append(secondElement) | ||
| elements.appendRemainingElements(from: &iterator) | ||
| self = .arbitrary(elements) | ||
|
|
||
| case .arbitrary(var elements): | ||
| if elements.isEmpty { | ||
| // if `self` is currently empty and `newElements` just contains a single | ||
| // element, we skip allocating an array and set `self` to `.one(firstElement)` | ||
| var iterator = newElements.makeIterator() | ||
| guard let firstElement = iterator.next() else { | ||
| // newElements is empty, nothing to do | ||
| return | ||
| } | ||
| guard let secondElement = iterator.next() else { | ||
| // newElements just contains a single element | ||
| // and we hit the fast path | ||
| self = .one(firstElement) | ||
| return | ||
| } | ||
| elements.reserveCapacity(elements.count + newElements.underestimatedCount) | ||
| elements.append(firstElement) | ||
| elements.append(secondElement) | ||
| elements.appendRemainingElements(from: &iterator) | ||
| self = .arbitrary(elements) | ||
|
|
||
| } else { | ||
| elements.append(contentsOf: newElements) | ||
| self = .arbitrary(elements) | ||
| } | ||
|
|
||
| } | ||
| } | ||
|
|
||
| @discardableResult | ||
| @inlinable | ||
| mutating func remove(at index: Int) -> Element { | ||
| switch self { | ||
| case .one(let oldElement): | ||
| guard index == 0 else { | ||
| fatalError("index \(index) out of bounds") | ||
| } | ||
| self = .arbitrary([]) | ||
| return oldElement | ||
|
|
||
| case .arbitrary(var elements): | ||
| defer { | ||
| self = .arbitrary(elements) | ||
| } | ||
| return elements.remove(at: index) | ||
|
|
||
| } | ||
| } | ||
|
|
||
| @inlinable | ||
| mutating func removeAll(where shouldBeRemoved: (Element) throws -> Bool) rethrows { | ||
| switch self { | ||
| case .one(let oldElement): | ||
| if try shouldBeRemoved(oldElement) { | ||
| self = .arbitrary([]) | ||
| } | ||
|
|
||
| case .arbitrary(var elements): | ||
| defer { | ||
| self = .arbitrary(elements) | ||
| } | ||
| return try elements.removeAll(where: shouldBeRemoved) | ||
|
|
||
| } | ||
| } | ||
|
|
||
| @inlinable | ||
| mutating func sort(by areInIncreasingOrder: (Element, Element) throws -> Bool) rethrows { | ||
| switch self { | ||
| case .one: | ||
| // a collection of just one element is always sorted, nothing to do | ||
| break | ||
| case .arbitrary(var elements): | ||
| defer { | ||
| self = .arbitrary(elements) | ||
| } | ||
|
|
||
| try elements.sort(by: areInIncreasingOrder) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| extension Array { | ||
| @inlinable | ||
| mutating func appendRemainingElements(from iterator: inout some IteratorProtocol<Element>) { | ||
| while let nextElement = iterator.next() { | ||
| append(nextElement) | ||
| } | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this could perhaps be subsumed by
InlineArrayinstead since the original intent was to have a faster storage (this is not a blocking concept)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InlineArrayisn't capable of fully replacing the need here. The idea behindTinyArrayis to have a fast path for a single element that doesn't allocate and one that allocates if there are more. Since we can't tell at compile time how many producers we have we need this runtime dynamism.