Skip to content

Commit 75f3133

Browse files
author
mustii
committed
Move the logic for reading the payload to GRPCServerCodec, and fixed the logic
1 parent 1e2d71b commit 75f3133

File tree

5 files changed

+55
-55
lines changed

5 files changed

+55
-55
lines changed

Sources/GRPC/GRPCServerCodec.swift

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public enum _GRPCServerResponsePart<ResponsePayload: GRPCPayload> {
4141
internal final class GRPCServerCodec<RequestPayload: GRPCPayload, ResponsePayload: GRPCPayload> {
4242
// 1-byte for compression flag, 4-bytes for message length.
4343
private let protobufMetadataSize = 5
44+
var messageWriter = LengthPrefixedMessageWriter(compression: .none)
4445
}
4546

4647
extension GRPCServerCodec: ChannelInboundHandler {
@@ -77,9 +78,9 @@ extension GRPCServerCodec: ChannelOutboundHandler {
7778

7879
case .message(let message):
7980
do {
80-
var buffer = ByteBufferAllocator().buffer(capacity: 0)
81-
try message.serialize(into: &buffer)
82-
context.write(self.wrapOutboundOut(.message(buffer)), promise: promise)
81+
var responseBuffer = context.channel.allocator.buffer(capacity: protobufMetadataSize)
82+
try messageWriter.write(message, into: &responseBuffer)
83+
context.write(self.wrapOutboundOut(.message(responseBuffer)), promise: promise)
8384
} catch {
8485
let error = GRPCError.SerializationFailure().captureContext()
8586
promise?.fail(error)

Sources/GRPC/HTTP1ToRawGRPCServerCodec.swift

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ public final class HTTP1ToRawGRPCServerCodec {
5858
private let logger: Logger
5959
private let accessLog: Logger
6060
private var stopwatch: Stopwatch?
61+
62+
// 1-byte for compression flag, 4-bytes for message length.
63+
private let protobufMetadataSize = 5
6164

6265
// The following buffers use force unwrapping explicitly. With optionals, developers
6366
// are encouraged to unwrap them using guard-else statements. These don't work cleanly
@@ -88,7 +91,6 @@ public final class HTTP1ToRawGRPCServerCodec {
8891
}
8992
}
9093

91-
var messageWriter = LengthPrefixedMessageWriter(compression: .none)
9294
var messageReader: LengthPrefixedMessageReader
9395
}
9496

@@ -257,27 +259,29 @@ extension HTTP1ToRawGRPCServerCodec: ChannelOutboundHandler {
257259
self.logger.error("invalid state '\(self.outboundState)' while writing message", metadata: ["message": "\(messageBytes)"])
258260
return
259261
}
260-
var messageBytes = messageBytes
262+
261263
if contentType == .text {
262264
precondition(self.responseTextBuffer != nil)
263265

264266
// Store the response into an independent buffer. We can't return the message directly as
265267
// it needs to be aggregated with all the responses plus the trailers, in order to have
266268
// the base64 response properly encoded in a single byte stream.
267-
//
268-
// We can try! here because the server does not support compression at the moment and this
269-
// only throws when compression fails.
270-
try! messageWriter.write(&messageBytes, into: &self.responseTextBuffer)
271-
269+
270+
var messageBytes = messageBytes
271+
// disregards the first 5 bytes since the rewritting them to the buffer doesnt require them
272+
messageBytes.moveReaderIndex(forwardBy: protobufMetadataSize)
273+
// Writes the compression bit as 'Zero' since the server cant compress data
274+
self.responseTextBuffer.writeInteger(UInt8(0))
275+
276+
// copying the old data to the responseTextBuffer
277+
self.responseTextBuffer.writeInteger(UInt32(messageBytes.readableBytes))
278+
self.responseTextBuffer.writeBytes(messageBytes.readableBytesView)
279+
272280
// Since we stored the written data, mark the write promise as successful so that the
273281
// ServerStreaming provider continues sending the data.
274282
promise?.succeed(())
275283
} else {
276-
var responseBuffer = context.channel.allocator.buffer(capacity: LengthPrefixedMessageWriter.metadataLength)
277-
// We can try! here because the server does not support compression at the moment and this
278-
// only throws when compression fails.
279-
try! messageWriter.write(&messageBytes, into: &responseBuffer)
280-
context.write(self.wrapOutboundOut(.body(.byteBuffer(responseBuffer))), promise: promise)
284+
context.write(self.wrapOutboundOut(.body(.byteBuffer(messageBytes))), promise: promise)
281285
}
282286
self.outboundState = .expectingBodyOrStatus
283287

Sources/GRPC/LengthPrefixedMessageWriter.swift

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,39 +49,53 @@ internal struct LengthPrefixedMessageWriter {
4949
/// - Returns: A `ByteBuffer` containing a gRPC length-prefixed message.
5050
/// - Precondition: `compression.supported` is `true`.
5151
/// - Note: See `LengthPrefixedMessageReader` for more details on the format.
52-
func write(_ payload: inout ByteBuffer, into buffer: inout ByteBuffer, disableCompression: Bool = false) throws {
53-
buffer.reserveCapacity(LengthPrefixedMessageWriter.metadataLength + payload.readableBytes)
54-
52+
func write(_ payload: GRPCPayload, into buffer: inout ByteBuffer, disableCompression: Bool = false) throws {
53+
54+
buffer.reserveCapacity(LengthPrefixedMessageWriter.metadataLength)
55+
5556
if !disableCompression, let compressor = self.compressor {
5657
// Set the compression byte.
5758
buffer.writeInteger(UInt8(1))
58-
59+
5960
// Leave a gap for the length, we'll set it in a moment.
6061
let payloadSizeIndex = buffer.writerIndex
6162
buffer.moveWriterIndex(forwardBy: MemoryLayout<UInt32>.size)
6263

64+
// Build the message by serializing it into a Buffer first
65+
// Since deflate can only take a ByteBuffer
66+
var messageBuf = ByteBufferAllocator().buffer(capacity: 0)
67+
try payload.serialize(into: &messageBuf)
68+
6369
// Compress the message.
64-
var payload = payload
65-
let bytesWritten = try compressor.deflate(&payload, into: &buffer)
70+
let bytesWritten = try compressor.deflate(&messageBuf, into: &buffer)
6671

6772
// Now fill in the message length.
6873
buffer.writePayloadLength(UInt32(bytesWritten), at: payloadSizeIndex)
6974

7075
// Finally, the compression context should be reset between messages.
7176
compressor.reset()
7277
} else {
78+
let startBufferIndex = buffer.readableBytes
7379
// 'identity' compression has no compressor but should still set the compression bit set
7480
// unless we explicitly disable compression.
7581
if self.compression?.algorithm == .identity && !disableCompression {
7682
buffer.writeInteger(UInt8(1))
7783
} else {
7884
buffer.writeInteger(UInt8(0))
7985
}
86+
87+
// Leave a gap for the length, we'll set it in a moment.
88+
let payloadSizeIndex = buffer.writerIndex
89+
buffer.writeInteger(UInt32(0))
90+
91+
// Writes the payload into the buffer
92+
try payload.serialize(into: &buffer)
93+
94+
// Calculates the Written bytes with respect to the prefixed ones
95+
let writtenFrame = buffer.readableBytes - LengthPrefixedMessageWriter.metadataLength - startBufferIndex
8096

8197
// Write the message length.
82-
buffer.writeInteger(UInt32(payload.readableBytes))
83-
// And the message bytes.
84-
buffer.writeBytes(payload.readableBytesView)
98+
buffer.writePayloadLength(UInt32(writtenFrame), at: payloadSizeIndex)
8599
}
86100
}
87101
}

Sources/GRPC/ReadWriteStates.swift

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,19 +63,12 @@ enum WriteState {
6363
return .failure(.cardinalityViolation)
6464

6565
case let .writing(writeArity, contentType, writer):
66-
// Zero is fine: the writer will allocate the correct amount of space.
67-
var messageBuffer = allocator.buffer(capacity: 0)
68-
do {
69-
try message.serialize(into: &messageBuffer)
70-
} catch {
71-
self = .notWriting
72-
return .failure(.serializationFailed)
73-
}
7466
// Zero is fine: the writer will allocate the correct amount of space.
7567
var buffer = allocator.buffer(capacity: 0)
7668
do {
77-
try writer.write(&messageBuffer, into: &buffer, disableCompression: disableCompression)
69+
try writer.write(message, into: &buffer)
7870
} catch {
71+
self = .notWriting
7972
return .failure(.serializationFailed)
8073
}
8174

Tests/GRPCTests/GRPCClientStateMachineTests.swift

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,8 @@ class GRPCClientStateMachineTests: GRPCTestCase {
5252

5353
/// Writes a message into the given `buffer`.
5454
func writeMessage<T: GRPCPayload>(_ message: T, into buffer: inout ByteBuffer) throws {
55-
var messageBuffer = allocator.buffer(capacity: 0)
56-
try message.serialize(into: &messageBuffer)
5755
let writer = LengthPrefixedMessageWriter(compression: .none)
58-
try writer.write(&messageBuffer, into: &buffer)
56+
try writer.write(message, into: &buffer)
5957
}
6058

6159
/// Returns a minimally valid `HPACKHeaders` for a response.
@@ -876,10 +874,8 @@ class ReadStateTests: GRPCTestCase {
876874
// Write a message into the buffer:
877875
let message = Echo_EchoRequest.with { $0.text = "Hello!" }
878876
let writer = LengthPrefixedMessageWriter(compression: .none)
879-
var messageBuffer = self.allocator.buffer(capacity: 0)
880-
try message.serialize(into: &messageBuffer)
881877
var buffer = self.allocator.buffer(capacity: 0)
882-
try writer.write(&messageBuffer, into: &buffer)
878+
try writer.write(message, into: &buffer)
883879
// And some extra junk bytes:
884880
let bytes: [UInt8] = [0x00]
885881
buffer.writeBytes(bytes)
@@ -896,11 +892,9 @@ class ReadStateTests: GRPCTestCase {
896892
let message = Echo_EchoRequest.with { $0.text = "Hello!" }
897893
let writer = LengthPrefixedMessageWriter(compression: .none)
898894
var buffer = self.allocator.buffer(capacity: 0)
899-
var messageBuffer = self.allocator.buffer(capacity: 0)
900-
try message.serialize(into: &messageBuffer)
901-
try writer.write(&messageBuffer, into: &buffer)
902-
try writer.write(&messageBuffer, into: &buffer)
903-
895+
try writer.write(message, into: &buffer)
896+
try writer.write(message, into: &buffer)
897+
904898
var state: ReadState = .one()
905899
state.readMessages(&buffer, as: Echo_EchoRequest.self).assertFailure {
906900
XCTAssertEqual($0, .cardinalityViolation)
@@ -912,10 +906,8 @@ class ReadStateTests: GRPCTestCase {
912906
// Write a message into the buffer twice:
913907
let message = Echo_EchoRequest.with { $0.text = "Hello!" }
914908
let writer = LengthPrefixedMessageWriter(compression: .none)
915-
var messageBuffer = self.allocator.buffer(capacity: 0)
916-
try message.serialize(into: &messageBuffer)
917909
var buffer = self.allocator.buffer(capacity: 0)
918-
try writer.write(&messageBuffer, into: &buffer)
910+
try writer.write(message, into: &buffer)
919911

920912
var state: ReadState = .one()
921913
state.readMessages(&buffer, as: Echo_EchoRequest.self).assertSuccess {
@@ -930,10 +922,8 @@ class ReadStateTests: GRPCTestCase {
930922
// Write a message into the buffer twice:
931923
let message = Echo_EchoRequest.with { $0.text = "Hello!" }
932924
let writer = LengthPrefixedMessageWriter(compression: .none)
933-
var messageBuffer = self.allocator.buffer(capacity: 0)
934-
try message.serialize(into: &messageBuffer)
935925
var buffer = self.allocator.buffer(capacity: 0)
936-
try writer.write(&messageBuffer, into: &buffer)
926+
try writer.write(message, into: &buffer)
937927

938928
var state: ReadState = .many()
939929
state.readMessages(&buffer, as: Echo_EchoRequest.self).assertSuccess {
@@ -948,12 +938,10 @@ class ReadStateTests: GRPCTestCase {
948938
// Write a message into the buffer twice:
949939
let message = Echo_EchoRequest.with { $0.text = "Hello!" }
950940
let writer = LengthPrefixedMessageWriter(compression: .none)
951-
var messageBuffer = self.allocator.buffer(capacity: 0)
952-
try message.serialize(into: &messageBuffer)
953941
var buffer = self.allocator.buffer(capacity: 0)
954-
try writer.write(&messageBuffer, into: &buffer)
955-
try writer.write(&messageBuffer, into: &buffer)
956-
try writer.write(&messageBuffer, into: &buffer)
942+
try writer.write(message, into: &buffer)
943+
try writer.write(message, into: &buffer)
944+
try writer.write(message, into: &buffer)
957945

958946
var state: ReadState = .many()
959947
state.readMessages(&buffer, as: Echo_EchoRequest.self).assertSuccess {

0 commit comments

Comments
 (0)