Skip to content

Commit e595df4

Browse files
authored
Add a broadcast async sequence (grpc#1684)
Motivation: To support retries and hedging we need a way to buffer elements over time that can support multiple consumers concurrently and allows for consumers to start consuming after some elements have been produced. An `AsyncSequence` fits this quite naturally but we don't yet have a general purpose implementat that fits this requirement. This change adds `BroadcastAsyncSequence` which isn't a general purpose async sequence but instead is tailored to the needs of grpc for hedging and retries. This means it supports a low number of concurrent iterators and maintains a limited size internal buffer and drops the slowest consumers when the buffer becomes full. Modifications: - Add a `BroadcastAsyncSequence` and tests - Made a bunch of things inlinable/usableFromInline which necessitated a switch from `@_spi(Testing)` to `@testable` imports. - Rename the 'Stream' directory to 'Streaming' Result: - `BroadcastAsyncSequence` can be used to implement retries and hedging.
1 parent 6ccafcc commit e595df4

File tree

11 files changed

+2034
-14
lines changed

11 files changed

+2034
-14
lines changed

Package.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ extension Target {
158158
static let grpcCore: Target = .target(
159159
name: "GRPCCore",
160160
dependencies: [
161-
.dequeModule
161+
.dequeModule,
162162
],
163163
path: "Sources/GRPCCore"
164164
)

Sources/GRPCCore/Call/Client/ClientResponse.swift

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -371,8 +371,7 @@ extension ClientResponse.Stream {
371371

372372
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
373373
extension ClientResponse.Single {
374-
@_spi(Testing)
375-
public init(stream response: ClientResponse.Stream<Message>) async {
374+
init(stream response: ClientResponse.Stream<Message>) async {
376375
switch response.accepted {
377376
case .success(let contents):
378377
do {

Sources/GRPCCore/Stream/AsyncSequenceOfOne.swift renamed to Sources/GRPCCore/Streaming/Internal/AsyncSequenceOfOne.swift

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,38 +17,46 @@
1717
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
1818
extension RPCAsyncSequence {
1919
/// Returns an ``RPCAsyncSequence`` containing just the given element.
20-
@_spi(Testing)
21-
public static func one(_ element: Element) -> Self {
20+
@inlinable
21+
static func one(_ element: Element) -> Self {
2222
return Self(wrapping: AsyncSequenceOfOne<Element, Never>(result: .success(element)))
2323
}
2424

2525
/// Returns an ``RPCAsyncSequence`` throwing the given error.
26-
@_spi(Testing)
27-
public static func throwing<E: Error>(_ error: E) -> Self {
26+
@inlinable
27+
static func throwing<E: Error>(_ error: E) -> Self {
2828
return Self(wrapping: AsyncSequenceOfOne<Element, E>(result: .failure(error)))
2929
}
3030
}
3131

3232
/// An `AsyncSequence` of a single value.
33+
@usableFromInline
3334
@available(macOS 10.15, iOS 13.0, tvOS 13, watchOS 6, *)
34-
private struct AsyncSequenceOfOne<Element: Sendable, Failure: Error>: AsyncSequence {
35-
private let result: Result<Element, Failure>
35+
struct AsyncSequenceOfOne<Element: Sendable, Failure: Error>: AsyncSequence {
36+
@usableFromInline
37+
let result: Result<Element, Failure>
3638

39+
@inlinable
3740
init(result: Result<Element, Failure>) {
3841
self.result = result
3942
}
4043

44+
@inlinable
4145
func makeAsyncIterator() -> AsyncIterator {
4246
AsyncIterator(result: self.result)
4347
}
4448

49+
@usableFromInline
4550
struct AsyncIterator: AsyncIteratorProtocol {
46-
private var result: Result<Element, Failure>?
51+
@usableFromInline
52+
private(set) var result: Result<Element, Failure>?
4753

48-
fileprivate init(result: Result<Element, Failure>) {
54+
@inlinable
55+
init(result: Result<Element, Failure>) {
4956
self.result = result
5057
}
5158

59+
@inlinable
5260
mutating func next() async throws -> Element? {
5361
guard let result = self.result else { return nil }
5462

0 commit comments

Comments
 (0)