Skip to content

Commit f5e9f93

Browse files
author
mustii
committed
Adds direct access to buffer by using a payload protocol
1 parent afad4f4 commit f5e9f93

25 files changed

+165
-50
lines changed

Sources/Examples/Echo/Model/echo.grpc.swift

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,3 +149,12 @@ extension Echo_EchoProvider {
149149
}
150150
}
151151

152+
extension Echo_EchoRequest: Payload {
153+
public init(serializedByteBuffer: inout ByteBuffer) throws {}
154+
public func serialize(into buffer: inout ByteBuffer) throws {}
155+
}
156+
157+
extension Echo_EchoResponse: Payload {
158+
public init(serializedByteBuffer: inout ByteBuffer) throws {}
159+
public func serialize(into buffer: inout ByteBuffer) throws {}
160+
}

Sources/Examples/HelloWorld/Model/helloworld.grpc.swift

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,31 @@ extension Helloworld_GreeterProvider {
8484
}
8585
}
8686

87+
extension Helloworld_HelloRequest: Payload {
88+
public init(serializedByteBuffer: inout ByteBuffer) throws {
89+
try self.init(serializedData: serializedByteBuffer.readData(length: serializedByteBuffer.readableBytes)!,
90+
extensions: nil,
91+
partial: false,
92+
options: BinaryDecodingOptions())
93+
}
94+
95+
public func serialize(into buffer: inout ByteBuffer) throws {
96+
let data = try serializedData()
97+
buffer.writeBytes(data)
98+
}
99+
}
100+
101+
extension Helloworld_HelloReply: Payload {
102+
public init(serializedByteBuffer: inout ByteBuffer) throws {
103+
try self.init(serializedData: serializedByteBuffer.readData(length: serializedByteBuffer.readableBytes)!,
104+
extensions: nil,
105+
partial: false,
106+
options: BinaryDecodingOptions())
107+
}
108+
109+
public func serialize(into buffer: inout ByteBuffer) throws {
110+
let data = try serializedData()
111+
buffer.writeBytes(data)
112+
}
113+
}
114+

Sources/Examples/RouteGuide/Model/route_guide.grpc.swift

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,3 +149,28 @@ extension Routeguide_RouteGuideProvider {
149149
}
150150
}
151151

152+
extension Routeguide_RouteNote: Payload {
153+
public init(serializedByteBuffer: inout ByteBuffer) throws {}
154+
public func serialize(into buffer: inout ByteBuffer) throws {}
155+
}
156+
157+
extension Routeguide_Feature: Payload {
158+
public init(serializedByteBuffer: inout ByteBuffer) throws {}
159+
public func serialize(into buffer: inout ByteBuffer) throws {}
160+
}
161+
162+
extension Routeguide_RouteSummary: Payload {
163+
public init(serializedByteBuffer: inout ByteBuffer) throws {}
164+
public func serialize(into buffer: inout ByteBuffer) throws {}
165+
}
166+
167+
extension Routeguide_Point: Payload {
168+
public init(serializedByteBuffer: inout ByteBuffer) throws {}
169+
public func serialize(into buffer: inout ByteBuffer) throws {}
170+
}
171+
172+
extension Routeguide_Rectangle: Payload {
173+
public init(serializedByteBuffer: inout ByteBuffer) throws {}
174+
public func serialize(into buffer: inout ByteBuffer) throws {}
175+
}
176+

Sources/GRPC/CallHandlers/BidirectionalStreamingCallHandler.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ import Logging
2626
/// they can fail the observer block future.
2727
/// - To close the call and send the status, complete `context.statusPromise`.
2828
public class BidirectionalStreamingCallHandler<
29-
RequestMessage: Message,
30-
ResponseMessage: Message
29+
RequestMessage: Payload,
30+
ResponseMessage: Payload
3131
>: _BaseCallHandler<RequestMessage, ResponseMessage> {
3232
public typealias Context = StreamingResponseCallContext<ResponseMessage>
3333
public typealias EventObserver = (StreamEvent<RequestMessage>) -> Void

Sources/GRPC/CallHandlers/ClientStreamingCallHandler.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ enum ClientStreamingHandlerObserverState<Factory, Observer> {
3434
/// they can fail the observer block future.
3535
/// - To close the call and send the response, complete `context.responsePromise`.
3636
public final class ClientStreamingCallHandler<
37-
RequestMessage: Message,
38-
ResponseMessage: Message
37+
RequestMessage: Payload,
38+
ResponseMessage: Payload
3939
>: _BaseCallHandler<RequestMessage, ResponseMessage> {
4040
public typealias Context = UnaryResponseCallContext<ResponseMessage>
4141
public typealias EventObserver = (StreamEvent<RequestMessage>) -> Void

Sources/GRPC/CallHandlers/ServerStreamingCallHandler.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import Logging
2424
/// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed.
2525
/// - To close the call and send the status, complete the status future returned by the observer block.
2626
public final class ServerStreamingCallHandler<
27-
RequestMessage: Message,
28-
ResponseMessage: Message
27+
RequestMessage: Payload,
28+
ResponseMessage: Payload
2929
>: _BaseCallHandler<RequestMessage, ResponseMessage> {
3030
public typealias EventObserver = (RequestMessage) -> EventLoopFuture<GRPCStatus>
3131

Sources/GRPC/CallHandlers/UnaryCallHandler.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ import Logging
2525
/// - To return a response to the client, the framework user should complete that future
2626
/// (similar to e.g. serving regular HTTP requests in frameworks such as Vapor).
2727
public final class UnaryCallHandler<
28-
RequestMessage: Message,
29-
ResponseMessage: Message
28+
RequestMessage: Payload,
29+
ResponseMessage: Payload
3030
>: _BaseCallHandler<RequestMessage, ResponseMessage> {
3131
public typealias EventObserver = (RequestMessage) -> EventLoopFuture<ResponseMessage>
3232
private var eventObserver: EventObserver?

Sources/GRPC/CallHandlers/_BaseCallHandler.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import Logging
2323
///
2424
/// Calls through to `processMessage` for individual messages it receives, which needs to be implemented by subclasses.
2525
/// - Important: This is **NOT** part of the public API.
26-
public class _BaseCallHandler<RequestMessage: Message, ResponseMessage: Message>: GRPCCallHandler {
26+
public class _BaseCallHandler<RequestMessage: Payload, ResponseMessage: Payload>: GRPCCallHandler {
2727
public func makeGRPCServerCodec() -> ChannelHandler {
2828
return GRPCServerCodec<RequestMessage, ResponseMessage>()
2929
}

Sources/GRPC/ClientCalls/BaseClientCall.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ import Logging
4848
///
4949
/// This class also provides much of the framework user facing functionality via conformance to
5050
/// `ClientCall`.
51-
public class BaseClientCall<Request: Message, Response: Message>: ClientCall {
51+
public class BaseClientCall<Request: Payload, Response: Payload>: ClientCall {
5252
public typealias RequestMessage = Request
5353
public typealias ResponseMessage = Response
5454

Sources/GRPC/ClientCalls/BidirectionalStreamingCall.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import Logging
2727
/// - `initialMetadata`: the initial metadata returned from the server,
2828
/// - `status`: the status of the gRPC call after it has ended,
2929
/// - `trailingMetadata`: any metadata returned from the server alongside the `status`.
30-
public final class BidirectionalStreamingCall<RequestMessage: Message, ResponseMessage: Message>
30+
public final class BidirectionalStreamingCall<RequestMessage: Payload, ResponseMessage: Payload>
3131
: BaseClientCall<RequestMessage, ResponseMessage>,
3232
StreamingRequestClientCall {
3333
private var messageQueue: EventLoopFuture<Void>

Sources/GRPC/ClientCalls/ClientCall.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ import SwiftProtobuf
2323
/// Base protocol for a client call to a gRPC service.
2424
public protocol ClientCall {
2525
/// The type of the request message for the call.
26-
associatedtype RequestMessage: Message
26+
associatedtype RequestMessage: Payload
2727
/// The type of the response message for the call.
28-
associatedtype ResponseMessage: Message
28+
associatedtype ResponseMessage: Payload
2929

3030
/// HTTP/2 stream that requests and responses are sent and received on.
3131
var subchannel: EventLoopFuture<Channel> { get }

Sources/GRPC/ClientCalls/ClientStreamingCall.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import Logging
2828
/// - `response`: the response from the call,
2929
/// - `status`: the status of the gRPC call after it has ended,
3030
/// - `trailingMetadata`: any metadata returned from the server alongside the `status`.
31-
public final class ClientStreamingCall<RequestMessage: Message, ResponseMessage: Message>
31+
public final class ClientStreamingCall<RequestMessage: Payload, ResponseMessage: Payload>
3232
: BaseClientCall<RequestMessage, ResponseMessage>,
3333
StreamingRequestClientCall,
3434
UnaryResponseClientCall {

Sources/GRPC/ClientCalls/ServerStreamingCall.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import Logging
2424
/// - `initialMetadata`: the initial metadata returned from the server,
2525
/// - `status`: the status of the gRPC call after it has ended,
2626
/// - `trailingMetadata`: any metadata returned from the server alongside the `status`.
27-
public final class ServerStreamingCall<RequestMessage: Message, ResponseMessage: Message>: BaseClientCall<RequestMessage, ResponseMessage> {
27+
public final class ServerStreamingCall<RequestMessage: Payload, ResponseMessage: Payload>: BaseClientCall<RequestMessage, ResponseMessage> {
2828
public init(
2929
connection: ClientConnection,
3030
path: String,

Sources/GRPC/ClientCalls/UnaryCall.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import Logging
2727
/// - `response`: the response from the unary call,
2828
/// - `status`: the status of the gRPC call after it has ended,
2929
/// - `trailingMetadata`: any metadata returned from the server alongside the `status`.
30-
public final class UnaryCall<RequestMessage: Message, ResponseMessage: Message>
30+
public final class UnaryCall<RequestMessage: Payload, ResponseMessage: Payload>
3131
: BaseClientCall<RequestMessage, ResponseMessage>,
3232
UnaryResponseClientCall {
3333
public let response: EventLoopFuture<ResponseMessage>

Sources/GRPC/GRPCClientResponseChannelHandler.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import Logging
2424
/// This includes holding promises for the initial metadata and status of the gRPC call. This handler
2525
/// is also responsible for error handling, via an error delegate and by appropriately failing the
2626
/// aforementioned promises.
27-
internal class GRPCClientResponseChannelHandler<ResponseMessage: Message>: ChannelInboundHandler {
27+
internal class GRPCClientResponseChannelHandler<ResponseMessage: Payload>: ChannelInboundHandler {
2828
public typealias InboundIn = _GRPCClientResponsePart<ResponseMessage>
2929
internal let logger: Logger
3030
internal var stopwatch: Stopwatch?
@@ -186,7 +186,7 @@ internal class GRPCClientResponseChannelHandler<ResponseMessage: Message>: Chann
186186
}
187187

188188
/// A channel handler for client calls which receive a single response.
189-
final class GRPCClientUnaryResponseChannelHandler<ResponseMessage: Message>: GRPCClientResponseChannelHandler<ResponseMessage> {
189+
final class GRPCClientUnaryResponseChannelHandler<ResponseMessage: Payload>: GRPCClientResponseChannelHandler<ResponseMessage> {
190190
let responsePromise: EventLoopPromise<ResponseMessage>
191191

192192
internal init(
@@ -236,7 +236,7 @@ final class GRPCClientUnaryResponseChannelHandler<ResponseMessage: Message>: GRP
236236
}
237237

238238
/// A channel handler for client calls which receive a stream of responses.
239-
final class GRPCClientStreamingResponseChannelHandler<ResponseMessage: Message>: GRPCClientResponseChannelHandler<ResponseMessage> {
239+
final class GRPCClientStreamingResponseChannelHandler<ResponseMessage: Payload>: GRPCClientResponseChannelHandler<ResponseMessage> {
240240
typealias ResponseHandler = (ResponseMessage) -> Void
241241

242242
let responseHandler: ResponseHandler

Sources/GRPC/GRPCClientStateMachine.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ enum SendEndOfRequestStreamError: Error {
6666
/// A state machine for a single gRPC call from the perspective of a client.
6767
///
6868
/// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
69-
struct GRPCClientStateMachine<Request: Message, Response: Message> {
69+
struct GRPCClientStateMachine<Request: Payload, Response: Payload> {
7070
/// The combined state of the request (client) and response (server) streams for an RPC call.
7171
///
7272
/// The following states are not possible:

Sources/GRPC/GRPCServerCodec.swift

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import NIOHTTP1
2222
/// Incoming gRPC package with a fixed message type.
2323
///
2424
/// - Important: This is **NOT** part of the public API.
25-
public enum _GRPCServerRequestPart<RequestMessage: Message> {
25+
public enum _GRPCServerRequestPart<RequestMessage: Payload> {
2626
case head(HTTPRequestHead)
2727
case message(RequestMessage)
2828
case end
@@ -31,14 +31,14 @@ public enum _GRPCServerRequestPart<RequestMessage: Message> {
3131
/// Outgoing gRPC package with a fixed message type.
3232
///
3333
/// - Important: This is **NOT** part of the public API.
34-
public enum _GRPCServerResponsePart<ResponseMessage: Message> {
34+
public enum _GRPCServerResponsePart<ResponseMessage: Payload> {
3535
case headers(HTTPHeaders)
3636
case message(ResponseMessage)
3737
case statusAndTrailers(GRPCStatus, HTTPHeaders)
3838
}
3939

4040
/// A simple channel handler that translates raw gRPC packets into decoded protobuf messages, and vice versa.
41-
internal final class GRPCServerCodec<RequestMessage: Message, ResponseMessage: Message> {}
41+
internal final class GRPCServerCodec<RequestMessage: Payload, ResponseMessage: Payload> {}
4242

4343
extension GRPCServerCodec: ChannelInboundHandler {
4444
typealias InboundIn = _RawGRPCServerRequestPart
@@ -50,9 +50,8 @@ extension GRPCServerCodec: ChannelInboundHandler {
5050
context.fireChannelRead(self.wrapInboundOut(.head(requestHead)))
5151

5252
case .message(var message):
53-
let messageAsData = message.readData(length: message.readableBytes)!
5453
do {
55-
context.fireChannelRead(self.wrapInboundOut(.message(try RequestMessage(serializedData: messageAsData))))
54+
context.fireChannelRead(self.wrapInboundOut(.message(try RequestMessage(serializedByteBuffer: &message))))
5655
} catch {
5756
context.fireErrorCaught(GRPCError.DeserializationFailure().captureContext())
5857
}
@@ -75,8 +74,9 @@ extension GRPCServerCodec: ChannelOutboundHandler {
7574

7675
case .message(let message):
7776
do {
78-
let messageData = try message.serializedData()
79-
context.write(self.wrapOutboundOut(.message(messageData)), promise: promise)
77+
var buffer = ByteBufferAllocator().buffer(capacity: 0)
78+
try message.serialize(into: &buffer)
79+
context.write(self.wrapOutboundOut(.message(buffer.readData(length: buffer.readableBytes)!)), promise: promise)
8080
} catch {
8181
let error = GRPCError.SerializationFailure().captureContext()
8282
promise?.fail(error)

Sources/GRPC/Payload.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import NIO
2+
import SwiftProtobuf
3+
4+
public protocol Payload {
5+
init(serializedByteBuffer: inout NIO.ByteBuffer) throws
6+
func serialize(into buffer: inout NIO.ByteBuffer) throws
7+
}

Sources/GRPC/ReadWriteStates.swift

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,23 +57,25 @@ enum WriteState {
5757
/// - Parameter allocator: An allocator to provide a `ByteBuffer` into which the message will be
5858
/// written.
5959
mutating func write(
60-
_ message: Message,
60+
_ message: Payload,
6161
allocator: ByteBufferAllocator
6262
) -> Result<ByteBuffer, MessageWriteError> {
6363
switch self {
6464
case .notWriting:
6565
return .failure(.cardinalityViolation)
6666

6767
case let .writing(writeArity, contentType, writer):
68-
guard let data = try? message.serializedData() else {
68+
// Zero is fine: the writer will allocate the correct amount of space.
69+
var buffer = allocator.buffer(capacity: 0)
70+
71+
do {
72+
try message.serialize(into: &buffer)
73+
} catch {
6974
self = .notWriting
7075
return .failure(.serializationFailed)
7176
}
72-
73-
// Zero is fine: the writer will allocate the correct amount of space.
74-
var buffer = allocator.buffer(capacity: 0)
75-
writer.write(data, into: &buffer)
76-
77+
var buf = allocator.buffer(capacity: 0)
78+
writer.write(buffer.readData(length: buffer.readableBytes)!, into: &buf)
7779
// If we only expect to write one message then we're no longer writable.
7880
if case .one = writeArity {
7981
self = .notWriting
@@ -113,7 +115,7 @@ enum ReadState {
113115
/// a message has been produced then subsequent calls will result in an error.
114116
///
115117
/// - Parameter buffer: The buffer to read from.
116-
mutating func readMessages<MessageType: Message>(
118+
mutating func readMessages<MessageType: Payload>(
117119
_ buffer: inout ByteBuffer,
118120
as: MessageType.Type = MessageType.self
119121
) -> Result<[MessageType], MessageReadError> {
@@ -128,8 +130,7 @@ enum ReadState {
128130
do {
129131
while var serializedBytes = try? reader.nextMessage() {
130132
// Force unwrapping is okay here: we will always be able to read `readableBytes`.
131-
let serializedData = serializedBytes.readData(length: serializedBytes.readableBytes)!
132-
messages.append(try MessageType(serializedData: serializedData))
133+
messages.append(try MessageType(serializedByteBuffer: &serializedBytes))
133134
}
134135
} catch {
135136
self = .notReading

Sources/GRPC/ServerCallContexts/StreamingResponseCallContext.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import Logging
2525
/// - If `statusPromise` is failed and the error is of type `GRPCStatusTransformable`,
2626
/// the result of `error.asGRPCStatus()` will be returned to the client.
2727
/// - If `error.asGRPCStatus()` is not available, `GRPCStatus.processingError` is returned to the client.
28-
open class StreamingResponseCallContext<ResponseMessage: Message>: ServerCallContextBase {
28+
open class StreamingResponseCallContext<ResponseMessage: Payload>: ServerCallContextBase {
2929
typealias WrappedResponse = _GRPCServerResponsePart<ResponseMessage>
3030

3131
public let statusPromise: EventLoopPromise<GRPCStatus>
@@ -41,7 +41,7 @@ open class StreamingResponseCallContext<ResponseMessage: Message>: ServerCallCon
4141
}
4242

4343
/// Concrete implementation of `StreamingResponseCallContext` used by our generated code.
44-
open class StreamingResponseCallContextImpl<ResponseMessage: Message>: StreamingResponseCallContext<ResponseMessage> {
44+
open class StreamingResponseCallContextImpl<ResponseMessage: Payload>: StreamingResponseCallContext<ResponseMessage> {
4545
public let channel: Channel
4646

4747
/// - Parameters:
@@ -80,7 +80,7 @@ open class StreamingResponseCallContextImpl<ResponseMessage: Message>: Streaming
8080
/// Concrete implementation of `StreamingResponseCallContext` used for testing.
8181
///
8282
/// Simply records all sent messages.
83-
open class StreamingResponseCallContextTestStub<ResponseMessage: Message>: StreamingResponseCallContext<ResponseMessage> {
83+
open class StreamingResponseCallContextTestStub<ResponseMessage: Payload>: StreamingResponseCallContext<ResponseMessage> {
8484
open var recordedResponses: [ResponseMessage] = []
8585

8686
open override func sendResponse(_ message: ResponseMessage) -> EventLoopFuture<Void> {

Sources/GRPC/ServerCallContexts/UnaryResponseCallContext.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import Logging
2828
///
2929
/// For unary calls, the response is not actually provided by fulfilling `responsePromise`, but instead by completing
3030
/// the future returned by `UnaryCallHandler.EventObserver`.
31-
open class UnaryResponseCallContext<ResponseMessage: Message>: ServerCallContextBase, StatusOnlyCallContext {
31+
open class UnaryResponseCallContext<ResponseMessage: Payload>: ServerCallContextBase, StatusOnlyCallContext {
3232
typealias WrappedResponse = _GRPCServerResponsePart<ResponseMessage>
3333

3434
public let responsePromise: EventLoopPromise<ResponseMessage>
@@ -55,7 +55,7 @@ public protocol StatusOnlyCallContext: ServerCallContext {
5555
}
5656

5757
/// Concrete implementation of `UnaryResponseCallContext` used by our generated code.
58-
open class UnaryResponseCallContextImpl<ResponseMessage: Message>: UnaryResponseCallContext<ResponseMessage> {
58+
open class UnaryResponseCallContextImpl<ResponseMessage: Payload>: UnaryResponseCallContext<ResponseMessage> {
5959
public let channel: Channel
6060

6161
/// - Parameters:
@@ -93,4 +93,4 @@ open class UnaryResponseCallContextImpl<ResponseMessage: Message>: UnaryResponse
9393
/// Concrete implementation of `UnaryResponseCallContext` used for testing.
9494
///
9595
/// Only provided to make it clear in tests that no "real" implementation is used.
96-
open class UnaryResponseCallContextTestStub<ResponseMessage: Message>: UnaryResponseCallContext<ResponseMessage> { }
96+
open class UnaryResponseCallContextTestStub<ResponseMessage: Payload>: UnaryResponseCallContext<ResponseMessage> { }

Sources/GRPC/StreamEvent.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import SwiftProtobuf
1717

1818
/// An event that can occur on a client-streaming RPC. Provided to the event observer registered for that call.
19-
public enum StreamEvent<Message: SwiftProtobuf.Message> {
19+
public enum StreamEvent<Message: Payload> {
2020
case message(Message)
2121
case end
2222
//! FIXME: Also support errors in this type, to propagate them to the event handler.

0 commit comments

Comments
 (0)