Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic client RPC executor #1693

Merged
merged 13 commits into from
Nov 1, 2023
6 changes: 6 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ let packageDependencies: [Package.Dependency] = [
url: "https://github.com/apple/swift-collections.git",
from: "1.0.5"
),
.package(
url: "https://github.com/apple/swift-atomics.git",
from: "1.2.0"
),
.package(
url: "https://github.com/apple/swift-protobuf.git",
from: "1.20.2"
Expand Down Expand Up @@ -125,6 +129,7 @@ extension Target.Dependency {
package: "swift-protobuf"
)
static let dequeModule: Self = .product(name: "DequeModule", package: "swift-collections")
static let atomics: Self = .product(name: "Atomics", package: "swift-atomics")

static let grpcCore: Self = .target(name: "GRPCCore")
}
Expand Down Expand Up @@ -224,6 +229,7 @@ extension Target {
dependencies: [
.grpcCore,
.dequeModule,
.atomics
]
)

Expand Down
12 changes: 0 additions & 12 deletions Sources/GRPCCore/Call/Client/ClientRequest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,3 @@ extension ClientRequest {
}
}
}

// MARK: - Conversion

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension ClientRequest.Stream {
@_spi(Testing)
public init(single request: ClientRequest.Single<Message>) {
self.init(metadata: request.metadata) {
try await $0.write(request.message)
}
}
}
67 changes: 0 additions & 67 deletions Sources/GRPCCore/Call/Client/ClientResponse.swift
Original file line number Diff line number Diff line change
Expand Up @@ -366,70 +366,3 @@ extension ClientResponse.Stream {
}
}
}

// MARK: - Conversion

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension ClientResponse.Single {
init(stream response: ClientResponse.Stream<Message>) async {
switch response.accepted {
case .success(let contents):
do {
let metadata = contents.metadata
var iterator = contents.bodyParts.makeAsyncIterator()

// Happy path: message, trailing metadata, nil.
let part1 = try await iterator.next()
let part2 = try await iterator.next()
let part3 = try await iterator.next()

switch (part1, part2, part3) {
case (.some(.message(let message)), .some(.trailingMetadata(let trailingMetadata)), .none):
let contents = Contents(
metadata: metadata,
message: message,
trailingMetadata: trailingMetadata
)
self.accepted = .success(contents)

case (.some(.message), .some(.message), _):
let error = RPCError(
code: .unimplemented,
message: """
Multiple messages received, but only one is expected. The server may have \
incorrectly implemented the RPC or the client and server may have a different \
opinion on whether this RPC streams responses.
"""
)
self.accepted = .failure(error)

case (.some(.trailingMetadata), .none, .none):
let error = RPCError(
code: .unimplemented,
message: "No messages received, exactly one was expected."
)
self.accepted = .failure(error)

case (_, _, _):
let error = RPCError(
code: .internalError,
message: """
The stream from the client transport is invalid. This is likely to be an incorrectly \
implemented transport. Received parts: \([part1, part2, part3])."
"""
)
self.accepted = .failure(error)
}
} catch let error as RPCError {
// Known error type.
self.accepted = .failure(error)
} catch {
// Unexpected, but should be handled nonetheless.
self.accepted = .failure(RPCError(code: .unknown, message: String(describing: error)))
}

case .failure(let error):
self.accepted = .failure(error)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright 2023, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
extension ClientRPCExecutor {
/// An executor for requests which doesn't apply retries or hedging. The request has just one
/// attempt at execution.
@usableFromInline
struct OneShotExecutor<
Transport: ClientTransport,
Serializer: MessageSerializer,
Deserializer: MessageDeserializer
> {
@usableFromInline
typealias Input = Serializer.Message
@usableFromInline
typealias Output = Deserializer.Message

@usableFromInline
let transport: Transport
@usableFromInline
let timeout: Duration?
@usableFromInline
let interceptors: [any ClientInterceptor]
@usableFromInline
let serializer: Serializer
@usableFromInline
let deserializer: Deserializer

@inlinable
init(
transport: Transport,
timeout: Duration?,
interceptors: [any ClientInterceptor],
serializer: Serializer,
deserializer: Deserializer
) {
self.transport = transport
self.timeout = timeout
self.interceptors = interceptors
self.serializer = serializer
self.deserializer = deserializer
}
}
}

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
extension ClientRPCExecutor.OneShotExecutor {
@inlinable
func execute<R>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I believe we usually try to avoid using single letters for generic types.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the obvious name is "Result" but that becomes ambiguous in these instances with Swift.Result. Any naming suggestions?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah Result would probably be confusing. Is Response equally bad?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's equally bad because it's not the response, it's whatever the user returns as a result of handling the response. It doesn't necessarily have to be related to the response at all. I think R is fine given that this is internal, it isn't ambiguous with anything else and we can always change it when we come up with a better name.

request: ClientRequest.Stream<Input>,
method: MethodDescriptor,
responseHandler: @Sendable @escaping (ClientResponse.Stream<Output>) async throws -> R
) async throws -> R {
let result = await withTaskGroup(
of: _OneShotExecutorTask<R>.self,
returning: Result<R, Error>.self
) { group in
if let timeout = self.timeout {
group.addTask {
let result = await Result {
try await Task.sleep(until: .now.advanced(by: timeout), clock: .continuous)
}
return .timedOut(result)
}
}

let streamExecutor = ClientStreamExecutor(transport: self.transport)
group.addTask {
return .streamExecutorCompleted(await streamExecutor.run())
}

group.addTask {
let response = await ClientRPCExecutor.unsafeExecute(
request: request,
method: method,
attempt: 1,
serializer: self.serializer,
deserializer: self.deserializer,
interceptors: self.interceptors,
streamProcessor: streamExecutor
)

let result = await Result {
try await responseHandler(response)
}

return .responseHandled(result)
}

while let result = await group.next() {
switch result {
case .streamExecutorCompleted(.success):
// Stream finished; wait for the response to be handled.
()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style question: do we prefer/is there any difference/advantage to () versus continue or break in this case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the difference is that () is "do nothing" whereas break and continue alter the flow control.

Using () if there was other code in the while loop beyond the switch then we'd go to that. I think break would do the same (but I'd always doubt myself and wonder if it would break the while loop instead). continue would skip any code after the switch so there are slight differences. I'd say () and break are safer in that regard. I'd only reach for continue if there was code afterwards that I explicitly wanted to skip.

I don't think we have a preferred style, but the style in NIO was always to use () over break so I write it without thinking now.


case .streamExecutorCompleted(.failure):
// Stream execution threw: cancel and wait.
group.cancelAll()
()
glbrntt marked this conversation as resolved.
Show resolved Hide resolved

case .timedOut(.success):
// The deadline passed; cancel the ongoing work group.
group.cancelAll()

case .timedOut(.failure):
// The deadline task failed (because the task was cancelled). Wait for the response
// to be handled.
()

case .responseHandled(let result):
// Response handled: cancel any other remaining tasks.
group.cancelAll()
return result
}
}

// Unreachable: exactly one task returns `responseHandled` and we return when it completes.
fatalError("Internal inconsistency")
}

return try result.get()
}
}

@usableFromInline
enum _OneShotExecutorTask<R> {
case streamExecutorCompleted(Result<Void, RPCError>)
case timedOut(Result<Void, Error>)
case responseHandled(Result<R, Error>)
}
Loading
Loading