- 
                Notifications
    
You must be signed in to change notification settings  - Fork 435
 
Add a broadcast async sequence #1684
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Motivation: To support retries and hedging we need a way to buffer elements over time that can support multiple consumers concurrently and allows for consumers to start consuming after some elements have been produced. An `AsyncSequence` fits this quite naturally but we don't yet have a general purpose implementat that fits this requirement. This change adds `BroadcastAsyncSequence` which isn't a general purpose async sequence but instead is tailored to the needs of grpc for hedging and retries. This means it supports a low number of concurrent iterators and maintains a limited size internal buffer and drops the slowest consumers when the buffer becomes full. Modifications: - Add a `BroadcastAsyncSequence` and tests - Made a bunch of things inlinable/usableFromInline which necessitated a switch from `@_spi(Testing)` to `@testable` imports. - Rename the 'Stream' directory to 'Streaming' Result: - `BroadcastAsyncSequence` can be used to implement retries and hedging.
        
          
                Sources/GRPCCore/Streaming/Internal/BroadcastAsyncSequence.swift
              
                Outdated
          
            Show resolved
            Hide resolved
        
      | @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) | ||
| extension BroadcastAsyncSequence { | ||
| @usableFromInline | ||
| struct Continuation: Sendable { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename this to Source please. We have been leaning towards Source lately since Continuation is very overloaded in Swift
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will update the naming.
        
          
                Sources/GRPCCore/Streaming/Internal/BroadcastAsyncSequence.swift
              
                Outdated
          
            Show resolved
            Hide resolved
        
      | init(_storage: _BroadcastSequenceStorage<Element>) { | ||
| self._storage = _storage | ||
| } | ||
| 
               | 
          
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we want a deinit here as well so we can differentiate when the producer and when the consumer go away?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comes back to being not general purpose: users never touch this and we control it carefully and will always finish it in the right place which lets us avoid allocating here.
| /// one subscriber at a time, for hedging there may be at most five subscribers at a time. | ||
| @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) | ||
| @usableFromInline | ||
| struct BroadcastAsyncSequence<Element: Sendable>: Sendable, AsyncSequence { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall I am a bit surprised that this is a root asynchronous sequence and not a transformational one. In my mind broadcast should always be transformational async sequence that you can apply to something like an AsyncStream or AsyncBufferredStream is there any particular reason why this is a root asynchronous sequence?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It better fits the use case here. A general purpose one probably would be transformational rather than a root. Mostly I want to avoid double buffering, but also since requests have a provider rather than an async sequence we can avoid a bunch of allocations by just creating the one async sequence.
Motivation:
To support retries and hedging we need a way to buffer elements over time that can support multiple consumers concurrently and allows for consumers to start consuming after some elements have been produced.
An
AsyncSequencefits this quite naturally but we don't yet have a general purpose implementat that fits this requirement. This change addsBroadcastAsyncSequencewhich isn't a general purpose async sequence but instead is tailored to the needs of grpc for hedging and retries. This means it supports a low number of concurrent iterators and maintains a limited size internal buffer and drops the slowest consumers when the buffer becomes full.Modifications:
BroadcastAsyncSequenceand tests@_spi(Testing)to@testableimports.Result:
BroadcastAsyncSequencecan be used to implement retries and hedging.