Skip to content

Remove the requirement that messages conform to GRPCPayload on the server #886

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Sources/Examples/Echo/Model/echo.grpc.swift
Original file line number Diff line number Diff line change
Expand Up @@ -278,26 +278,26 @@ extension Echo_EchoProvider {
public func handleMethod(_ methodName: String, callHandlerContext: CallHandlerContext) -> GRPCCallHandler? {
switch methodName {
case "Get":
return UnaryCallHandler(callHandlerContext: callHandlerContext) { context in
return CallHandlerFactory.makeUnary(callHandlerContext: callHandlerContext) { context in
return { request in
self.get(request: request, context: context)
}
}

case "Expand":
return ServerStreamingCallHandler(callHandlerContext: callHandlerContext) { context in
return CallHandlerFactory.makeServerStreaming(callHandlerContext: callHandlerContext) { context in
return { request in
self.expand(request: request, context: context)
}
}

case "Collect":
return ClientStreamingCallHandler(callHandlerContext: callHandlerContext) { context in
return CallHandlerFactory.makeClientStreaming(callHandlerContext: callHandlerContext) { context in
return self.collect(context: context)
}

case "Update":
return BidirectionalStreamingCallHandler(callHandlerContext: callHandlerContext) { context in
return CallHandlerFactory.makeBidirectionalStreaming(callHandlerContext: callHandlerContext) { context in
return self.update(context: context)
}

Expand Down
2 changes: 1 addition & 1 deletion Sources/Examples/HelloWorld/Model/helloworld.grpc.swift
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ extension Helloworld_GreeterProvider {
public func handleMethod(_ methodName: String, callHandlerContext: CallHandlerContext) -> GRPCCallHandler? {
switch methodName {
case "SayHello":
return UnaryCallHandler(callHandlerContext: callHandlerContext) { context in
return CallHandlerFactory.makeUnary(callHandlerContext: callHandlerContext) { context in
return { request in
self.sayHello(request: request, context: context)
}
Expand Down
8 changes: 4 additions & 4 deletions Sources/Examples/RouteGuide/Model/route_guide.grpc.swift
Original file line number Diff line number Diff line change
Expand Up @@ -195,26 +195,26 @@ extension Routeguide_RouteGuideProvider {
public func handleMethod(_ methodName: String, callHandlerContext: CallHandlerContext) -> GRPCCallHandler? {
switch methodName {
case "GetFeature":
return UnaryCallHandler(callHandlerContext: callHandlerContext) { context in
return CallHandlerFactory.makeUnary(callHandlerContext: callHandlerContext) { context in
return { request in
self.getFeature(request: request, context: context)
}
}

case "ListFeatures":
return ServerStreamingCallHandler(callHandlerContext: callHandlerContext) { context in
return CallHandlerFactory.makeServerStreaming(callHandlerContext: callHandlerContext) { context in
return { request in
self.listFeatures(request: request, context: context)
}
}

case "RecordRoute":
return ClientStreamingCallHandler(callHandlerContext: callHandlerContext) { context in
return CallHandlerFactory.makeClientStreaming(callHandlerContext: callHandlerContext) { context in
return self.recordRoute(context: context)
}

case "RouteChat":
return BidirectionalStreamingCallHandler(callHandlerContext: callHandlerContext) { context in
return CallHandlerFactory.makeBidirectionalStreaming(callHandlerContext: callHandlerContext) { context in
return self.routeChat(context: context)
}

Expand Down
19 changes: 9 additions & 10 deletions Sources/GRPC/CallHandlers/BidirectionalStreamingCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ import Logging
/// If the framework user wants to return a call error (e.g. in case of authentication failure),
/// they can fail the observer block future.
/// - To close the call and send the status, complete `context.statusPromise`.
public class BidirectionalStreamingCallHandler<
RequestPayload: GRPCPayload,
ResponsePayload: GRPCPayload
>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public class BidirectionalStreamingCallHandler<RequestPayload, ResponsePayload>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public typealias Context = StreamingResponseCallContext<ResponsePayload>
public typealias EventObserver = (StreamEvent<RequestPayload>) -> Void
public typealias EventObserverFactory = (Context) -> EventLoopFuture<EventObserver>
Expand All @@ -39,15 +36,17 @@ public class BidirectionalStreamingCallHandler<

// We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call.
// If authentication fails, they can simply fail the observer future, which causes the call to be terminated.
public init(
internal init<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
serializer: Serializer,
deserializer: Deserializer,
callHandlerContext: CallHandlerContext,
eventObserverFactory: @escaping (StreamingResponseCallContext<ResponsePayload>) -> EventLoopFuture<EventObserver>
) {
// Delay the creation of the event observer until we actually get a request head, otherwise it
// would be possible for the observer to write into the pipeline (by completing the status
// promise) before the pipeline is configured.
) where Serializer.Input == ResponsePayload, Deserializer.Output == RequestPayload {
self.eventObserverFactory = eventObserverFactory
super.init(callHandlerContext: callHandlerContext)
super.init(
callHandlerContext: callHandlerContext,
codec: GRPCServerCodecHandler(serializer: serializer, deserializer: deserializer)
)
}

internal override func processHead(_ head: HTTPRequestHead, context: ChannelHandlerContext) {
Expand Down
81 changes: 81 additions & 0 deletions Sources/GRPC/CallHandlers/CallHandlerFactory.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2020, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import NIO
import SwiftProtobuf

// We can't use a 'where' clause on 'init's to constrain the generic requirements of a type. Instead
// we'll use static methods on this factory.
public enum CallHandlerFactory {
public typealias UnaryContext<Response> = UnaryResponseCallContext<Response>
public typealias UnaryEventObserver<Request, Response> = (Request) -> EventLoopFuture<Response>

public static func makeUnary<Request: Message, Response: Message>(
callHandlerContext: CallHandlerContext,
eventObserverFactory: @escaping (UnaryContext<Response>) -> UnaryEventObserver<Request, Response>
) -> UnaryCallHandler<Request, Response> {
return UnaryCallHandler(
serializer: ProtobufSerializer(),
deserializer: ProtobufDeserializer(),
callHandlerContext: callHandlerContext,
eventObserverFactory: eventObserverFactory
)
}

public typealias ClientStreamingContext<Response> = UnaryResponseCallContext<Response>
public typealias ClientStreamingEventObserver<Request> = EventLoopFuture<(StreamEvent<Request>) -> Void>

public static func makeClientStreaming<Request: Message, Response: Message>(
callHandlerContext: CallHandlerContext,
eventObserverFactory: @escaping (ClientStreamingContext<Response>) -> ClientStreamingEventObserver<Request>
) -> ClientStreamingCallHandler<Request, Response> {
return ClientStreamingCallHandler(
serializer: ProtobufSerializer(),
deserializer: ProtobufDeserializer(),
callHandlerContext: callHandlerContext,
eventObserverFactory: eventObserverFactory
)
}

public typealias ServerStreamingContext<Response> = StreamingResponseCallContext<Response>
public typealias ServerStreamingEventObserver<Request> = (Request) -> EventLoopFuture<GRPCStatus>

public static func makeServerStreaming<Request: Message, Response: Message>(
callHandlerContext: CallHandlerContext,
eventObserverFactory: @escaping (ServerStreamingContext<Response>) -> ServerStreamingEventObserver<Request>
) -> ServerStreamingCallHandler<Request, Response> {
return ServerStreamingCallHandler(
serializer: ProtobufSerializer(),
deserializer: ProtobufDeserializer(),
callHandlerContext: callHandlerContext,
eventObserverFactory: eventObserverFactory
)
}

public typealias BidirectionalStreamingContext<Response> = StreamingResponseCallContext<Response>
public typealias BidirectionalStreamingEventObserver<Request> = EventLoopFuture<(StreamEvent<Request>) -> Void>

public static func makeBidirectionalStreaming<Request: Message, Response: Message>(
callHandlerContext: CallHandlerContext,
eventObserverFactory: @escaping (BidirectionalStreamingContext<Response>) -> BidirectionalStreamingEventObserver<Request>
) -> BidirectionalStreamingCallHandler<Request, Response> {
return BidirectionalStreamingCallHandler(
serializer: ProtobufSerializer(),
deserializer: ProtobufDeserializer(),
callHandlerContext: callHandlerContext,
eventObserverFactory: eventObserverFactory
)
}
}
17 changes: 11 additions & 6 deletions Sources/GRPC/CallHandlers/ClientStreamingCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ import Logging
/// If the framework user wants to return a call error (e.g. in case of authentication failure),
/// they can fail the observer block future.
/// - To close the call and send the response, complete `context.responsePromise`.
public final class ClientStreamingCallHandler<
RequestPayload: GRPCPayload,
ResponsePayload: GRPCPayload
>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public final class ClientStreamingCallHandler<RequestPayload, ResponsePayload>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public typealias Context = UnaryResponseCallContext<ResponsePayload>
public typealias EventObserver = (StreamEvent<RequestPayload>) -> Void
public typealias EventObserverFactory = (Context) -> EventLoopFuture<EventObserver>
Expand All @@ -39,9 +36,17 @@ public final class ClientStreamingCallHandler<

// We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call.
// If authentication fails, they can simply fail the observer future, which causes the call to be terminated.
public init(callHandlerContext: CallHandlerContext, eventObserverFactory: @escaping EventObserverFactory) {
internal init<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
serializer: Serializer,
deserializer: Deserializer,
callHandlerContext: CallHandlerContext,
eventObserverFactory: @escaping EventObserverFactory
) where Serializer.Input == ResponsePayload, Deserializer.Output == RequestPayload {
self.eventObserverFactory = eventObserverFactory
super.init(callHandlerContext: callHandlerContext)
super.init(
callHandlerContext: callHandlerContext,
codec: GRPCServerCodecHandler(serializer: serializer, deserializer: deserializer)
)
}

internal override func processHead(_ head: HTTPRequestHead, context: ChannelHandlerContext) {
Expand Down
19 changes: 9 additions & 10 deletions Sources/GRPC/CallHandlers/ServerStreamingCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,24 @@ import Logging
///
/// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed.
/// - To close the call and send the status, complete the status future returned by the observer block.
public final class ServerStreamingCallHandler<
RequestPayload: GRPCPayload,
ResponsePayload: GRPCPayload
>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public final class ServerStreamingCallHandler<RequestPayload, ResponsePayload>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public typealias EventObserver = (RequestPayload) -> EventLoopFuture<GRPCStatus>

private var eventObserver: EventObserver?
private var callContext: StreamingResponseCallContext<ResponsePayload>?
private let eventObserverFactory: (StreamingResponseCallContext<ResponsePayload>) -> EventObserver

public init(
internal init<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
serializer: Serializer,
deserializer: Deserializer,
callHandlerContext: CallHandlerContext,
eventObserverFactory: @escaping (StreamingResponseCallContext<ResponsePayload>) -> EventObserver
) {
// Delay the creation of the event observer until we actually get a request head, otherwise it
// would be possible for the observer to write into the pipeline (by completing the status
// promise) before the pipeline is configured.
) where Serializer.Input == ResponsePayload, Deserializer.Output == RequestPayload {
self.eventObserverFactory = eventObserverFactory
super.init(callHandlerContext: callHandlerContext)
super.init(
callHandlerContext: callHandlerContext,
codec: GRPCServerCodecHandler(serializer: serializer, deserializer: deserializer)
)
}

override internal func processHead(_ head: HTTPRequestHead, context: ChannelHandlerContext) {
Expand Down
16 changes: 9 additions & 7 deletions Sources/GRPC/CallHandlers/UnaryCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,23 @@ import Logging
/// - The observer block is implemented by the framework user and returns a future containing the call result.
/// - To return a response to the client, the framework user should complete that future
/// (similar to e.g. serving regular HTTP requests in frameworks such as Vapor).
public final class UnaryCallHandler<
RequestPayload: GRPCPayload,
ResponsePayload: GRPCPayload
>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public final class UnaryCallHandler<RequestPayload, ResponsePayload>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public typealias EventObserver = (RequestPayload) -> EventLoopFuture<ResponsePayload>
private var eventObserver: EventObserver?
private var callContext: UnaryResponseCallContext<ResponsePayload>?
private let eventObserverFactory: (UnaryResponseCallContext<ResponsePayload>) -> EventObserver

public init(
internal init<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
serializer: Serializer,
deserializer: Deserializer,
callHandlerContext: CallHandlerContext,
eventObserverFactory: @escaping (UnaryResponseCallContext<ResponsePayload>) -> EventObserver
) {
) where Serializer.Input == ResponsePayload, Deserializer.Output == RequestPayload {
self.eventObserverFactory = eventObserverFactory
super.init(callHandlerContext: callHandlerContext)
super.init(
callHandlerContext: callHandlerContext,
codec: GRPCServerCodecHandler(serializer: serializer, deserializer: deserializer)
)
}

internal override func processHead(_ head: HTTPRequestHead, context: ChannelHandlerContext) {
Expand Down
23 changes: 11 additions & 12 deletions Sources/GRPC/CallHandlers/_BaseCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,8 @@ import Logging
///
/// Calls through to `processMessage` for individual messages it receives, which needs to be implemented by subclasses.
/// - Important: This is **NOT** part of the public API.
public class _BaseCallHandler<RequestPayload: GRPCPayload, ResponsePayload: GRPCPayload>: GRPCCallHandler {
public func makeGRPCServerCodec() -> ChannelHandler {
return HTTP1ToGRPCServerCodec<RequestPayload, ResponsePayload>(
encoding: self.callHandlerContext.encoding,
logger: self.logger
)
}
public class _BaseCallHandler<Request, Response>: GRPCCallHandler {
public let _codec: ChannelHandler

/// Called when the request head has been received.
///
Expand All @@ -41,7 +36,7 @@ public class _BaseCallHandler<RequestPayload: GRPCPayload, ResponsePayload: GRPC
/// Called whenever a message has been received.
///
/// Overridden by subclasses.
internal func processMessage(_ message: RequestPayload) throws {
internal func processMessage(_ message: Request) throws {
fatalError("needs to be overridden")
}

Expand Down Expand Up @@ -69,13 +64,17 @@ public class _BaseCallHandler<RequestPayload: GRPCPayload, ResponsePayload: GRPC
return self.callHandlerContext.logger
}

internal init(callHandlerContext: CallHandlerContext) {
internal init(
callHandlerContext: CallHandlerContext,
codec: ChannelHandler
) {
self.callHandlerContext = callHandlerContext
self._codec = codec
}
}

extension _BaseCallHandler: ChannelInboundHandler {
public typealias InboundIn = _GRPCServerRequestPart<RequestPayload>
public typealias InboundIn = _GRPCServerRequestPart<Request>

/// Passes errors to the user-provided `errorHandler`. After an error has been received an
/// appropriate status is written. Errors which don't conform to `GRPCStatusTransformable`
Expand Down Expand Up @@ -122,8 +121,8 @@ extension _BaseCallHandler: ChannelInboundHandler {
}

extension _BaseCallHandler: ChannelOutboundHandler {
public typealias OutboundIn = _GRPCServerResponsePart<ResponsePayload>
public typealias OutboundOut = _GRPCServerResponsePart<ResponsePayload>
public typealias OutboundIn = _GRPCServerResponsePart<Response>
public typealias OutboundOut = _GRPCServerResponsePart<Response>

public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
guard self.serverCanWrite else {
Expand Down
Loading