Skip to content

Commit 86c421e

Browse files
author
mustiikhalil
authored
Adds the ability to use different payloads (#710)
Motivation: gRPC should be agnostic to the type of payload that it sends to the peer to allow for other message types to be used (e.g. Flatbuffers). Modifications: - Make the generated code generic over `GRPCPayload` - Provide a default implementation of `GRPCPayload` for `SwiftProtobuf.Message` in `GRPCProtobufPayload` - Update generated code such that request and response types conform to `GRPCProtobufPayload` - Shuffle around some of the server code to minimise payload copies Result: gRPC can send/receive payloads which conform to `GRPCPayload` instead of `SwiftProtobuf.Message`
1 parent 5f40a43 commit 86c421e

37 files changed

+323
-201
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ project.xcworkspace
33
xcuserdata
44
DerivedData/
55
.build
6+
.swiftpm
67
build
78
/protoc-gen-swift
89
/protoc-gen-grpc-swift

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ plugins: ${PROTOC_GEN_SWIFT} ${PROTOC_GEN_GRPC_SWIFT}
3131
${PROTOC_GEN_SWIFT}:
3232
${SWIFT_BUILD_RELEASE} --product protoc-gen-swift
3333

34-
${PROTOC_GEN_GRPC_SWIFT}:
34+
${PROTOC_GEN_GRPC_SWIFT}: Sources/protoc-gen-grpc-swift/*.swift
3535
${SWIFT_BUILD_RELEASE} --product protoc-gen-grpc-swift
3636

3737
interop-test-runner:

Sources/Examples/Echo/Model/echo.grpc.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,3 +149,7 @@ extension Echo_EchoProvider {
149149
}
150150
}
151151

152+
153+
/// Provides conformance to `GRPCPayload` for the request and response messages
154+
extension Echo_EchoRequest: GRPCProtobufPayload {}
155+
extension Echo_EchoResponse: GRPCProtobufPayload {}

Sources/Examples/HelloWorld/Model/helloworld.grpc.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,7 @@ extension Helloworld_GreeterProvider {
8484
}
8585
}
8686

87+
88+
/// Provides conformance to `GRPCPayload` for the request and response messages
89+
extension Helloworld_HelloRequest: GRPCProtobufPayload {}
90+
extension Helloworld_HelloReply: GRPCProtobufPayload {}

Sources/Examples/RouteGuide/Model/route_guide.grpc.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,3 +149,10 @@ extension Routeguide_RouteGuideProvider {
149149
}
150150
}
151151

152+
153+
/// Provides conformance to `GRPCPayload` for the request and response messages
154+
extension Routeguide_Point: GRPCProtobufPayload {}
155+
extension Routeguide_Feature: GRPCProtobufPayload {}
156+
extension Routeguide_Rectangle: GRPCProtobufPayload {}
157+
extension Routeguide_RouteSummary: GRPCProtobufPayload {}
158+
extension Routeguide_RouteNote: GRPCProtobufPayload {}

Sources/GRPC/CallHandlers/BidirectionalStreamingCallHandler.swift

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ import Logging
2626
/// they can fail the observer block future.
2727
/// - To close the call and send the status, complete `context.statusPromise`.
2828
public class BidirectionalStreamingCallHandler<
29-
RequestMessage: Message,
30-
ResponseMessage: Message
31-
>: _BaseCallHandler<RequestMessage, ResponseMessage> {
32-
public typealias Context = StreamingResponseCallContext<ResponseMessage>
33-
public typealias EventObserver = (StreamEvent<RequestMessage>) -> Void
29+
RequestPayload: GRPCPayload,
30+
ResponsePayload: GRPCPayload
31+
>: _BaseCallHandler<RequestPayload, ResponsePayload> {
32+
public typealias Context = StreamingResponseCallContext<ResponsePayload>
33+
public typealias EventObserver = (StreamEvent<RequestPayload>) -> Void
3434
public typealias EventObserverFactory = (Context) -> EventLoopFuture<EventObserver>
3535

3636
private var observerState: ClientStreamingHandlerObserverState<EventObserverFactory, EventObserver> {
@@ -44,7 +44,7 @@ public class BidirectionalStreamingCallHandler<
4444
// If authentication fails, they can simply fail the observer future, which causes the call to be terminated.
4545
public init(
4646
callHandlerContext: CallHandlerContext,
47-
eventObserverFactory: @escaping (StreamingResponseCallContext<ResponseMessage>) -> EventLoopFuture<EventObserver>
47+
eventObserverFactory: @escaping (StreamingResponseCallContext<ResponsePayload>) -> EventLoopFuture<EventObserver>
4848
) {
4949
// Delay the creation of the event observer until `handlerAdded(context:)`, otherwise it is
5050
// possible for the service to write into the pipeline (by fulfilling the status promise
@@ -53,7 +53,7 @@ public class BidirectionalStreamingCallHandler<
5353

5454
super.init(callHandlerContext: callHandlerContext)
5555

56-
let context = StreamingResponseCallContextImpl<ResponseMessage>(
56+
let context = StreamingResponseCallContextImpl<ResponsePayload>(
5757
channel: self.callHandlerContext.channel,
5858
request: self.callHandlerContext.request,
5959
errorDelegate: self.callHandlerContext.errorDelegate,
@@ -81,12 +81,12 @@ public class BidirectionalStreamingCallHandler<
8181

8282
// Terminate the call if the future providing an observer fails.
8383
// This is being done _after_ we have been added as a handler to ensure that the `GRPCServerCodec` required to
84-
// translate our outgoing `GRPCServerResponsePart<ResponseMessage>` message is already present on the channel.
84+
// translate our outgoing `GRPCServerResponsePart<ResponsePayload>` message is already present on the channel.
8585
// Otherwise, our `OutboundOut` type would not match the `OutboundIn` type of the next handler on the channel.
8686
eventObserver.cascadeFailure(to: callContext.statusPromise)
8787
}
8888

89-
internal override func processMessage(_ message: RequestMessage) {
89+
internal override func processMessage(_ message: RequestPayload) {
9090
guard case .created(let eventObserver) = self.observerState else {
9191
self.logger.warning("expecting observerState to be .created but was \(self.observerState), ignoring message \(message)")
9292
return

Sources/GRPC/CallHandlers/ClientStreamingCallHandler.swift

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,19 @@ enum ClientStreamingHandlerObserverState<Factory, Observer> {
3434
/// they can fail the observer block future.
3535
/// - To close the call and send the response, complete `context.responsePromise`.
3636
public final class ClientStreamingCallHandler<
37-
RequestMessage: Message,
38-
ResponseMessage: Message
39-
>: _BaseCallHandler<RequestMessage, ResponseMessage> {
40-
public typealias Context = UnaryResponseCallContext<ResponseMessage>
41-
public typealias EventObserver = (StreamEvent<RequestMessage>) -> Void
37+
RequestPayload: GRPCPayload,
38+
ResponsePayload: GRPCPayload
39+
>: _BaseCallHandler<RequestPayload, ResponsePayload> {
40+
public typealias Context = UnaryResponseCallContext<ResponsePayload>
41+
public typealias EventObserver = (StreamEvent<RequestPayload>) -> Void
4242
public typealias EventObserverFactory = (Context) -> EventLoopFuture<EventObserver>
4343

4444
private var observerState: ClientStreamingHandlerObserverState<EventObserverFactory, EventObserver> {
4545
willSet(newState) {
4646
self.logger.debug("observerState changed from \(self.observerState) to \(newState)")
4747
}
4848
}
49-
private var callContext: UnaryResponseCallContext<ResponseMessage>?
49+
private var callContext: UnaryResponseCallContext<ResponsePayload>?
5050

5151
// We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call.
5252
// If authentication fails, they can simply fail the observer future, which causes the call to be terminated.
@@ -58,7 +58,7 @@ public final class ClientStreamingCallHandler<
5858

5959
super.init(callHandlerContext: callHandlerContext)
6060

61-
let callContext = UnaryResponseCallContextImpl<ResponseMessage>(
61+
let callContext = UnaryResponseCallContextImpl<ResponsePayload>(
6262
channel: self.callHandlerContext.channel,
6363
request: self.callHandlerContext.request,
6464
errorDelegate: self.callHandlerContext.errorDelegate,
@@ -86,12 +86,12 @@ public final class ClientStreamingCallHandler<
8686

8787
// Terminate the call if the future providing an observer fails.
8888
// This is being done _after_ we have been added as a handler to ensure that the `GRPCServerCodec` required to
89-
// translate our outgoing `GRPCServerResponsePart<ResponseMessage>` message is already present on the channel.
89+
// translate our outgoing `GRPCServerResponsePart<ResponsePayload>` message is already present on the channel.
9090
// Otherwise, our `OutboundOut` type would not match the `OutboundIn` type of the next handler on the channel.
9191
eventObserver.cascadeFailure(to: callContext.responsePromise)
9292
}
9393

94-
internal override func processMessage(_ message: RequestMessage) {
94+
internal override func processMessage(_ message: RequestPayload) {
9595
guard case .created(let eventObserver) = self.observerState else {
9696
self.logger.warning("expecting observerState to be .created but was \(self.observerState), ignoring message \(message)")
9797
return

Sources/GRPC/CallHandlers/ServerStreamingCallHandler.swift

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,20 @@ import Logging
2424
/// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed.
2525
/// - To close the call and send the status, complete the status future returned by the observer block.
2626
public final class ServerStreamingCallHandler<
27-
RequestMessage: Message,
28-
ResponseMessage: Message
29-
>: _BaseCallHandler<RequestMessage, ResponseMessage> {
30-
public typealias EventObserver = (RequestMessage) -> EventLoopFuture<GRPCStatus>
27+
RequestPayload: GRPCPayload,
28+
ResponsePayload: GRPCPayload
29+
>: _BaseCallHandler<RequestPayload, ResponsePayload> {
30+
public typealias EventObserver = (RequestPayload) -> EventLoopFuture<GRPCStatus>
3131

3232
private var eventObserver: EventObserver?
33-
private var callContext: StreamingResponseCallContext<ResponseMessage>?
33+
private var callContext: StreamingResponseCallContext<ResponsePayload>?
3434

3535
public init(
3636
callHandlerContext: CallHandlerContext,
37-
eventObserverFactory: (StreamingResponseCallContext<ResponseMessage>) -> EventObserver
37+
eventObserverFactory: (StreamingResponseCallContext<ResponsePayload>) -> EventObserver
3838
) {
3939
super.init(callHandlerContext: callHandlerContext)
40-
let callContext = StreamingResponseCallContextImpl<ResponseMessage>(
40+
let callContext = StreamingResponseCallContextImpl<ResponsePayload>(
4141
channel: self.callHandlerContext.channel,
4242
request: self.callHandlerContext.request,
4343
errorDelegate: self.callHandlerContext.errorDelegate,
@@ -53,7 +53,7 @@ public final class ServerStreamingCallHandler<
5353
}
5454
}
5555

56-
override internal func processMessage(_ message: RequestMessage) throws {
56+
override internal func processMessage(_ message: RequestPayload) throws {
5757
guard let eventObserver = self.eventObserver,
5858
let callContext = self.callContext else {
5959
self.logger.error("processMessage(_:) called before the call started or after the call completed")

Sources/GRPC/CallHandlers/UnaryCallHandler.swift

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,19 @@ import Logging
2525
/// - To return a response to the client, the framework user should complete that future
2626
/// (similar to e.g. serving regular HTTP requests in frameworks such as Vapor).
2727
public final class UnaryCallHandler<
28-
RequestMessage: Message,
29-
ResponseMessage: Message
30-
>: _BaseCallHandler<RequestMessage, ResponseMessage> {
31-
public typealias EventObserver = (RequestMessage) -> EventLoopFuture<ResponseMessage>
28+
RequestPayload: GRPCPayload,
29+
ResponsePayload: GRPCPayload
30+
>: _BaseCallHandler<RequestPayload, ResponsePayload> {
31+
public typealias EventObserver = (RequestPayload) -> EventLoopFuture<ResponsePayload>
3232
private var eventObserver: EventObserver?
33-
private var callContext: UnaryResponseCallContext<ResponseMessage>?
33+
private var callContext: UnaryResponseCallContext<ResponsePayload>?
3434

3535
public init(
3636
callHandlerContext: CallHandlerContext,
37-
eventObserverFactory: (UnaryResponseCallContext<ResponseMessage>) -> EventObserver
37+
eventObserverFactory: (UnaryResponseCallContext<ResponsePayload>) -> EventObserver
3838
) {
3939
super.init(callHandlerContext: callHandlerContext)
40-
let callContext = UnaryResponseCallContextImpl<ResponseMessage>(
40+
let callContext = UnaryResponseCallContextImpl<ResponsePayload>(
4141
channel: self.callHandlerContext.channel,
4242
request: self.callHandlerContext.request,
4343
errorDelegate: self.callHandlerContext.errorDelegate,
@@ -53,7 +53,7 @@ public final class UnaryCallHandler<
5353
}
5454
}
5555

56-
internal override func processMessage(_ message: RequestMessage) throws {
56+
internal override func processMessage(_ message: RequestPayload) throws {
5757
guard let eventObserver = self.eventObserver,
5858
let context = self.callContext else {
5959
self.logger.error("processMessage(_:) called before the call started or after the call completed")

Sources/GRPC/CallHandlers/_BaseCallHandler.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,15 @@ import Logging
2323
///
2424
/// Calls through to `processMessage` for individual messages it receives, which needs to be implemented by subclasses.
2525
/// - Important: This is **NOT** part of the public API.
26-
public class _BaseCallHandler<RequestMessage: Message, ResponseMessage: Message>: GRPCCallHandler {
26+
public class _BaseCallHandler<RequestPayload: GRPCPayload, ResponsePayload: GRPCPayload>: GRPCCallHandler {
2727
public func makeGRPCServerCodec() -> ChannelHandler {
28-
return GRPCServerCodec<RequestMessage, ResponseMessage>()
28+
return GRPCServerCodec<RequestPayload, ResponsePayload>()
2929
}
3030

3131
/// Called whenever a message has been received.
3232
///
3333
/// Overridden by subclasses.
34-
internal func processMessage(_ message: RequestMessage) throws {
34+
internal func processMessage(_ message: RequestPayload) throws {
3535
fatalError("needs to be overridden")
3636
}
3737

@@ -71,7 +71,7 @@ public class _BaseCallHandler<RequestMessage: Message, ResponseMessage: Message>
7171
}
7272

7373
extension _BaseCallHandler: ChannelInboundHandler {
74-
public typealias InboundIn = _GRPCServerRequestPart<RequestMessage>
74+
public typealias InboundIn = _GRPCServerRequestPart<RequestPayload>
7575

7676
/// Passes errors to the user-provided `errorHandler`. After an error has been received an
7777
/// appropriate status is written. Errors which don't conform to `GRPCStatusTransformable`
@@ -120,8 +120,8 @@ extension _BaseCallHandler: ChannelInboundHandler {
120120
}
121121

122122
extension _BaseCallHandler: ChannelOutboundHandler {
123-
public typealias OutboundIn = _GRPCServerResponsePart<ResponseMessage>
124-
public typealias OutboundOut = _GRPCServerResponsePart<ResponseMessage>
123+
public typealias OutboundIn = _GRPCServerResponsePart<ResponsePayload>
124+
public typealias OutboundOut = _GRPCServerResponsePart<ResponsePayload>
125125

126126
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
127127
guard self.serverCanWrite else {

Sources/GRPC/ClientCalls/BaseClientCall.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ import Logging
4848
///
4949
/// This class also provides much of the framework user facing functionality via conformance to
5050
/// `ClientCall`.
51-
public class BaseClientCall<Request: Message, Response: Message>: ClientCall {
52-
public typealias RequestMessage = Request
53-
public typealias ResponseMessage = Response
51+
public class BaseClientCall<Request: GRPCPayload, Response: GRPCPayload>: ClientCall {
52+
public typealias RequestPayload = Request
53+
public typealias ResponsePayload = Response
5454

5555
internal let logger: Logger
5656

Sources/GRPC/ClientCalls/BidirectionalStreamingCall.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import Logging
2727
/// - `initialMetadata`: the initial metadata returned from the server,
2828
/// - `status`: the status of the gRPC call after it has ended,
2929
/// - `trailingMetadata`: any metadata returned from the server alongside the `status`.
30-
public final class BidirectionalStreamingCall<RequestMessage: Message, ResponseMessage: Message>
31-
: BaseClientCall<RequestMessage, ResponseMessage>,
30+
public final class BidirectionalStreamingCall<RequestPayload: GRPCPayload, ResponsePayload: GRPCPayload>
31+
: BaseClientCall<RequestPayload, ResponsePayload>,
3232
StreamingRequestClientCall {
3333
private var messageQueue: EventLoopFuture<Void>
3434

@@ -37,7 +37,7 @@ public final class BidirectionalStreamingCall<RequestMessage: Message, ResponseM
3737
path: String,
3838
callOptions: CallOptions,
3939
errorDelegate: ClientErrorDelegate?,
40-
handler: @escaping (ResponseMessage) -> Void
40+
handler: @escaping (ResponsePayload) -> Void
4141
) {
4242
self.messageQueue = connection.channel.eventLoop.makeSucceededFuture(())
4343
let requestID = callOptions.requestIDProvider.requestID()
@@ -64,7 +64,7 @@ public final class BidirectionalStreamingCall<RequestMessage: Message, ResponseM
6464
options: callOptions
6565
)
6666

67-
let requestHandler = _StreamingRequestChannelHandler<RequestMessage>(requestHead: requestHead)
67+
let requestHandler = _StreamingRequestChannelHandler<RequestPayload>(requestHead: requestHead)
6868

6969
super.init(
7070
eventLoop: connection.eventLoop,

0 commit comments

Comments
 (0)