-
Notifications
You must be signed in to change notification settings - Fork 420
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Basic client RPC executor #1693
Changes from 4 commits
bbfb993
463ed93
c7deb7d
c772637
f68d023
d42978c
f8d8ebd
834f105
431da59
924f004
5ee3de3
8545e25
113f379
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
/* | ||
* Copyright 2023, gRPC Authors All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) | ||
extension ClientRPCExecutor { | ||
/// An executor for requests which doesn't apply retries or hedging. The request has just one | ||
/// attempt at execution. | ||
@usableFromInline | ||
struct OneShotExecutor< | ||
Transport: ClientTransport, | ||
Serializer: MessageSerializer, | ||
Deserializer: MessageDeserializer | ||
> { | ||
@usableFromInline | ||
typealias Input = Serializer.Message | ||
@usableFromInline | ||
typealias Output = Deserializer.Message | ||
|
||
@usableFromInline | ||
let transport: Transport | ||
@usableFromInline | ||
let timeout: Duration? | ||
@usableFromInline | ||
let interceptors: [any ClientInterceptor] | ||
@usableFromInline | ||
let serializer: Serializer | ||
@usableFromInline | ||
let deserializer: Deserializer | ||
|
||
@inlinable | ||
init( | ||
transport: Transport, | ||
timeout: Duration?, | ||
interceptors: [any ClientInterceptor], | ||
serializer: Serializer, | ||
deserializer: Deserializer | ||
) { | ||
self.transport = transport | ||
self.timeout = timeout | ||
self.interceptors = interceptors | ||
self.serializer = serializer | ||
self.deserializer = deserializer | ||
} | ||
} | ||
} | ||
|
||
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) | ||
extension ClientRPCExecutor.OneShotExecutor { | ||
@inlinable | ||
func execute<R>( | ||
request: ClientRequest.Stream<Input>, | ||
method: MethodDescriptor, | ||
responseHandler: @Sendable @escaping (ClientResponse.Stream<Output>) async throws -> R | ||
) async throws -> R { | ||
let result = await withTaskGroup( | ||
of: _OneShotExecutorTask<R>.self, | ||
returning: Result<R, Error>.self | ||
) { group in | ||
if let timeout = self.timeout { | ||
group.addTask { | ||
let result = await Result { | ||
try await Task.sleep(until: .now.advanced(by: timeout), clock: .continuous) | ||
} | ||
return .timedOut(result) | ||
} | ||
} | ||
|
||
let streamExecutor = ClientStreamExecutor(transport: self.transport) | ||
group.addTask { | ||
return .streamExecutorCompleted(await streamExecutor.run()) | ||
} | ||
|
||
group.addTask { | ||
let response = await ClientRPCExecutor.unsafeExecute( | ||
request: request, | ||
method: method, | ||
attempt: 1, | ||
serializer: self.serializer, | ||
deserializer: self.deserializer, | ||
interceptors: self.interceptors, | ||
streamProcessor: streamExecutor | ||
) | ||
|
||
let result = await Result { | ||
try await responseHandler(response) | ||
} | ||
|
||
return .responseHandled(result) | ||
} | ||
|
||
while let result = await group.next() { | ||
switch result { | ||
case .streamExecutorCompleted(.success): | ||
// Stream finished; wait for the response to be handled. | ||
() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Style question: do we prefer/is there any difference/advantage to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess the difference is that Using I don't think we have a preferred style, but the style in NIO was always to use |
||
|
||
case .streamExecutorCompleted(.failure): | ||
// Stream execution threw: cancel and wait. | ||
group.cancelAll() | ||
() | ||
glbrntt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
case .timedOut(.success): | ||
// The deadline passed; cancel the ongoing work group. | ||
group.cancelAll() | ||
|
||
case .timedOut(.failure): | ||
// The deadline task failed (because the task was cancelled). Wait for the response | ||
// to be handled. | ||
() | ||
|
||
case .responseHandled(let result): | ||
// Response handled: cancel any other remaining tasks. | ||
group.cancelAll() | ||
return result | ||
} | ||
} | ||
|
||
// Unreachable: exactly one task returns `responseHandled` and we return when it completes. | ||
fatalError("Internal inconsistency") | ||
} | ||
|
||
return try result.get() | ||
} | ||
} | ||
|
||
@usableFromInline | ||
enum _OneShotExecutorTask<R> { | ||
case streamExecutorCompleted(Result<Void, RPCError>) | ||
case timedOut(Result<Void, Error>) | ||
case responseHandled(Result<R, Error>) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I believe we usually try to avoid using single letters for generic types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, the obvious name is "Result" but that becomes ambiguous in these instances with
Swift.Result
. Any naming suggestions?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah
Result
would probably be confusing. IsResponse
equally bad?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's equally bad because it's not the response, it's whatever the user returns as a result of handling the response. It doesn't necessarily have to be related to the response at all. I think
R
is fine given that this is internal, it isn't ambiguous with anything else and we can always change it when we come up with a better name.