-
Notifications
You must be signed in to change notification settings - Fork 428
Adds the ability to use different payloads #710
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds the ability to use different payloads #710
Conversation
Sources/GRPC/ReadWriteStates.swift
Outdated
// Zero is fine: the writer will allocate the correct amount of space. | ||
var buffer = allocator.buffer(capacity: 0) | ||
|
||
do { | ||
try message.serialize(into: &buffer) | ||
} catch { | ||
self = .notWriting | ||
return .failure(.serializationFailed) | ||
} | ||
|
||
// Zero is fine: the writer will allocate the correct amount of space. | ||
var buffer = allocator.buffer(capacity: 0) | ||
let data = buffer.readData(length: buffer.readableBytes)! | ||
buffer.clear() | ||
writer.write(data, into: &buffer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a prettier way of doing this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but unfortunately we need to rework the writer API slightly. I have a PR up at the moment which adds support for compression which complicates things a little more. For the time being I think making the writer generic over the payload and having it serialize the payload into the buffer is sufficient.
cb2755e
to
e4a5737
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great start @mustiikhalil! I've left some comments inline.
You may already be pretty close to the point where you can try out flatbuffers. If you look at e.g, the hello world example and switch the types to be flatbuffers and provide the conformance to Payload then it should work. Of course this is still missing all the flatbuffers codegen :)
Sources/GRPC/ReadWriteStates.swift
Outdated
// Zero is fine: the writer will allocate the correct amount of space. | ||
var buffer = allocator.buffer(capacity: 0) | ||
|
||
do { | ||
try message.serialize(into: &buffer) | ||
} catch { | ||
self = .notWriting | ||
return .failure(.serializationFailed) | ||
} | ||
|
||
// Zero is fine: the writer will allocate the correct amount of space. | ||
var buffer = allocator.buffer(capacity: 0) | ||
let data = buffer.readData(length: buffer.readableBytes)! | ||
buffer.clear() | ||
writer.write(data, into: &buffer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but unfortunately we need to rework the writer API slightly. I have a PR up at the moment which adds support for compression which complicates things a little more. For the time being I think making the writer generic over the payload and having it serialize the payload into the buffer is sufficient.
@@ -149,3 +149,12 @@ extension Echo_EchoProvider { | |||
} | |||
} | |||
|
|||
extension Echo_EchoRequest: Payload { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is generated, but it looks like these were added manually.
The implementation for the conformance to Payload
shouldn't be in the generated code. Instead we can create another protocol (e.g. GRPCProtobufPayload
) which conforms to Payload
and Message
. It should then provide the default implementation for Payload
so that the generated messages can be extended to conform to that protocol.
You'll need to take a look at the code-gen (in protoc-gen-grpc-swift
) to figure out how to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@glbrntt Yes, I just added them to see if they would pass the test cases (since they are still failing) and to have a draft of the PR. So we can talk about What we would be doing, and how.
we can have the following:
protocol GRPCProtobufPayload: GRPCPayload {}
extension GRPCProtobufPayload {
// comes the implementation of the functions
}
and similarly in FlatBuffers:
protocol GRPCFlatbufPayload: GRPCPayload {}
extension GRPCFlatbufPayload {
// comes the implementation of the functions
}
do you think we should generate the code? or implement it which the package itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah okay, no problem. Just an FYI if you want to run the tests locally you can set the ENABLE_TIMING_TESTS
environment variable to false
to skip some of the longer running tests (it takes the test time down to a couple of seconds).
The default implementation for GRPCProtobufPayload
(which should also conform to SwiftProtobuf.Message
) should be within GRPC
since we already depend on SwiftProtobuf
. This gives us more flexibility to change the implementation (i.e. if/when new Protobuf APIs become available) without regenerating the code.
For flatbuffers I think we have to generate the conformance to GRPCPayload
each time since there's no dependency between flatbuffers and gRPC Swift here.
9439f1e
to
8827fca
Compare
@glbrntt I think this is ready for review now! Thanks a million for the guidance though, did learn new things! |
b045bce
to
287022f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this @mustiikhalil, I've left some comments inline.
The CI is failing because the committed code for the Echo service doesn't match the code generated for that service; you'll need to regenerate it and commit it again.
Sources/GRPC/CallHandlers/BidirectionalStreamingCallHandler.swift
Outdated
Show resolved
Hide resolved
Sources/GRPC/GRPCServerCodec.swift
Outdated
var buffer = ByteBufferAllocator().buffer(capacity: 0) | ||
try message.serialize(into: &buffer) | ||
context.write(self.wrapOutboundOut(.message(buffer.readData(length: buffer.readableBytes)!)), promise: promise) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because of this change, we have quite a few unnecessary copies here and on the client side (between Data
and ByteBuffer
). This might require a fair bit of refactoring; would you be interested in doing this? If not I'm happy to do this in a followup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would gladly do it but do you have a structure in mind or should I look into how we can do it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll try to explain the problem and what needs to be done to improve it.
Problem
As you know, before this change we explicitly expected payloads to be Message
from SwiftProtobuf. The only serialization and deserialization methods from Message
are via Data
from Foundation. Internally we use NIO and our currency type for storing bytes is NIOs ByteBuffer
.
Because we always expect a Data
from Message
, the write
method on LengthPrefixedMessageWriter
accepts a Data
and writes into a ByteBuffer
.
With the protocol we always serialize into a ByteBuffer
(for Message
we have to go via Data
first). This means that we've gone from:
Message
toData
(serialization)Data
toByteBuffer
(length prefixing)
to
Message
toData
(serialization)Data
toByteBuffer
(we need to forGRPCPayload
)ByteBuffer
toData
(we need to forLengthPrefixedMessageWriter.write
)Data
toByteBuffer
(length prefixing)
Clearly we don't want to copy bytes from Data
to ByteBuffer
and back from ByteBuffer
to Data
again (and we also don't want our API for GRPCPayload
to be defined by Data
).
Now I need to provide some background on the LenthPrefixedMessageWriter
: gRPC payloads are sent with a 5-byte prefix (I might have explained this before but I'll repeat it for completeness). The first byte indicates whether the payload is compressed or not, the next four bytes indicate the length of the payload (the compressed length if it is compressed).
If the payload isn't compressed then this is actually really straightforward: we write a 0
-byte we write the length of the payload as a UInt32
and then we write the payload. For compression we have to write the a 1
-byte, leave a 4-byte gap for the length, compress the payload into the buffer and not how many bytes it was and then fill in the 4-byte gap.
Possible Solution
So I think the best we can do now (I'll use protobuf as an example again) is actually to pre-allocate the extra 5-byte space at the beginning of the buffer before we pass it into message.serialize(into:)
.
If we aren't using compression then all we need to do is fill in the first 5 bytes, easy! (And we have the Message
to Data
to ByteBuffer
, as we had before, for Flatbuffers I expect we have something similar.)
When we use compression things are slightly different because we can't compress in place, so we have an extra buffer but in this case the same applies, we can avoid the extra hop via Data
.
Hopefully this all makes sense so far?
What needs to be done?
LengthPrefixedMessageWriter.write
accepts aData
, instead I think the writer should accept aGRPCPayload
(as the writer knows whether compression is being used so knows how to fill in the first 5-bytes of the payload etc, it can also choose whether it should preallocate 5 bytes, if we're using compression then we don't need to since we have to compress into a different buffer anyway).Zlib.deflate
also accepts aData
if I recall correctly, this will need to be changed to aByteBuffer
On the server side of things, this will require having to move some components around. At the moment the length prefixed writer lives in HTTP1ToRawGRPCServerCodec
; this will have to move to GPRCServerCodec
. This means the types that get sent between these handlers will also have to change (_RawGRPCServerResponsePart.message
will have a ByteBuffer
as its payload which will be a length prefixed message). I also expect a number of tests will have to be updated to support these changes.
167989f
to
75f3133
Compare
I've fixed all the suggested things, and also refactored the code according to what we planned yesterday, however there is a point in the
Furthermore, I ran both commands locally and there is no difference between both I am not sure why the CLI is having issues with it :/
|
Great, I'll take another look now.
Oh I think I see the problem, the CI looks like it isn't actually rebuilding the plugin (maybe the workspace gets cached between different commits for the same PR?!). If you update the rule in the ${PROTOC_GEN_GRPC_SWIFT}: Sources/protoc-gen-grpc-swift/*.swift |
let's see hopefully it works. |
… with GRPC-swift Renames all the Requests and Responses + adds comments + fixes generator code Updated the code to only use bytebuffer instead of foundation data Move the logic for reading the payload to GRPCServerCodec, and fixed the logic Updated the rule for travis ci
98ef91b
to
19f2604
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left some more comments @mustiikhalil -- great work!
Sources/GRPCInteroperabilityTestModels/Generated/test.grpc.swift
Outdated
Show resolved
Hide resolved
3f8effa
to
00f9f91
Compare
…ming + generated code
95bcae7
to
b180af7
Compare
1135358
to
ff56fe4
Compare
ff56fe4
to
1cfb34c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few more changes needed but this is getting pretty close though @mustiikhalil.
try self.stream.deflate( | ||
outputBuffer: CGRPCZlib_castVoidToBytefPointer(outputPointer.baseAddress!), | ||
outputBufferSize: outputPointer.count | ||
) | ||
} | ||
|
||
let bytesRead = inputPointer.count - self.stream.availableInputBytes | ||
return (bytesRead, writtenBytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@glbrntt I think this should fix the issue with the pointer being accessed while we already moved out of the function
df14977
to
9ffa268
Compare
buffer.reserveCapacity(MemoryLayout.size(ofValue: payload) + LengthPrefixedMessageWriter.metadataLength) | ||
|
||
func write(_ payload: GRPCPayload, into buffer: inout ByteBuffer, disableCompression: Bool = false) throws {r | ||
buffer.reserveCapacity(buffer.writerIndex + LengthPrefixedMessageWriter.metadataLength) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved the code to use the buffer.writerIndex
instead of the MemoryLayout.size(of: payload)
since I think this way it will be more robust knowing that it will always have those 5 extra bytes on the buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch, thanks!
9ffa268
to
c94ca28
Compare
c94ca28
to
052f513
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, thanks for sticking with this @mustiikhalil; great job!
buffer.reserveCapacity(MemoryLayout.size(ofValue: payload) + LengthPrefixedMessageWriter.metadataLength) | ||
|
||
func write(_ payload: GRPCPayload, into buffer: inout ByteBuffer, disableCompression: Bool = false) throws {r | ||
buffer.reserveCapacity(buffer.writerIndex + LengthPrefixedMessageWriter.metadataLength) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch, thanks!
Wonderful! |
Good morning!
The following PR will be moving the
Message Protocol
away from the code base and introduces thePayload protocol
which will allow users, to decode messages according to the Type they want to send. it addresses the following issue: #708, @glbrntt let's see how we can make this better since I found the following issue:which is not needed all the time since the following object didn't require it to work