-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove buf copy when the compressor exist #1427
Conversation
rpc_util.go
Outdated
length uint | ||
b []byte | ||
length uint | ||
payloadLen = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two should be const
instead.
rpc_util.go
Outdated
} | ||
|
||
if length > math.MaxUint32 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unlikely to even trigger if there is a 4GB message. length should be a uint64 if we're serious about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the size can be larger than 4GB, we need to add one more byte (or 2 to 3 bits?) in the header for every message sent to save the length of the message. It's that worth?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not an option -- we must follow the protocol, which is 4 bytes to encode the length.
My point is about the type of the variables. Checking whether a uint is > MaxUint32 will only work on platforms where a uint is 64 bits.
tl;dr: please make length a uint64.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
rpc_util.go
Outdated
if cp != nil { | ||
// pre-alloc place | ||
cbuf.Write(make([]byte, 5)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Be consistent about your use of constants -- use "payloadLen+sizeLen" where you have 5.
rpc_util.go
Outdated
// Write payload format | ||
b[0] = byte(compressionMade) | ||
// Write length of b into buf | ||
binary.BigEndian.PutUint32(b[1:], uint32(length)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1->payloadLen
rpc_util.go
Outdated
|
||
var buf []byte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about inverting things to share the length-writing and wirelength code?
var buf []byte
if cp == nil {
buf = make([]byte, payloadLen+sizeLen+len(b))
buf[0] = byte(compressionNone)
} else {
buf = b
buf[0] = byte(compressionMade)
}
binary.BigEndian.PutUint32(buf[1:], uint32(length))
if outPayload != nil {
outPayload.WireLength = len(buf)
}
return buf, nil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. I thought buf = b will assign new alloc for buf, but it turns out it doesn't.
Since len(b) returns a int. It seems wrong because length > math.MaxUint32
will always returns false.
Good point. Only on those systems where "int" is 32 bits. Presumably
those systems can't have slices bigger than 4GB, so I suppose this is
fine. Why not leave "length" an int (instead of a uint, with casting),
then?
|
3c0da79
to
394a7de
Compare
call.go
Outdated
@@ -102,18 +102,21 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, | |||
Client: true, | |||
} | |||
} | |||
outBuf, err := encode(dopts.codec, args, compressor, cbuf, outPayload) | |||
outBuf, outData, err := encode(dopts.codec, args, compressor, cbuf, outPayload) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename "outBuf" to "outHdr" please, to represent what it has. Or just "hdr" and "data" would be fine, too -- it might look a bit nicer that way.
Please make a similar change everywhere encode is called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
call.go
Outdated
if err != nil { | ||
return err | ||
} | ||
if c.maxSendMessageSize == nil { | ||
return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") | ||
} | ||
if len(outBuf) > *c.maxSendMessageSize { | ||
if len(outBuf)+len(outData) > *c.maxSendMessageSize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's well-specified, but I would say actually this check should not include the header. So len(outData)
alone is better.
Please make a similar change everywhere encode is called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
call.go
Outdated
if err == nil && outPayload != nil { | ||
opts.Last = false | ||
errHeader := t.Write(stream, outBuf, opts) | ||
opts.Last = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be saved before setting it to false (on line 115) and restored to its previous value here instead of set to true.
There is the same problem in another spot, too, please fix all of them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
call_test.go
Outdated
if err != nil { | ||
t.Errorf("Failed to encode the response: %v", err) | ||
return | ||
} | ||
h.t.Write(s, reply, &transport.Options{}) | ||
h.t.Write(s, replyData, &transport.Options{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This creates another allocation. This would be better:
o := &transport.Options{}
h.t.Write(..., o)
h.t.Write(..., o)
This occurs below, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
rpc_util.go
Outdated
@@ -289,59 +289,53 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt | |||
|
|||
// encode serializes msg and prepends the message header. If msg is nil, it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment needs updating.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
rpc_util.go
Outdated
} | ||
|
||
length = len(b) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just remove "length" now. len(b) is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
rpc_util_test.go
Outdated
@@ -109,7 +109,8 @@ func TestEncode(t *testing.T) { | |||
}{ | |||
{nil, nil, []byte{0, 0, 0, 0, 0}, nil}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This hardly tests anything!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Assert both header and data.
a48e9e6
to
46a54bc
Compare
388cab6
to
3cb43d6
Compare
e3d0ddd
to
03d3d52
Compare
Update end2end_test.go to make it pass travis. Can you help me review it again? |
03d3d52
to
debf39a
Compare
debf39a
to
01dd4b2
Compare
call.go
Outdated
if err != nil { | ||
return err | ||
} | ||
if c.maxSendMessageSize == nil { | ||
return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") | ||
} | ||
if len(outBuf) > *c.maxSendMessageSize { | ||
return Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(outBuf), *c.maxSendMessageSize) | ||
if len(hdr)+len(data) > *c.maxSendMessageSize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we said we wouldn't include the header as the message for this check. (Also the error message would be out of sync.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
rpc_util.go
Outdated
if len(b) < secondStart { | ||
secondStart = len(b) | ||
} | ||
bufHeader = append(bufHeader, b[:secondStart]...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see what happened here.
Please keep outputting this the way it was before: only the header + all of the message data.
Then pass them into the transport.Write function as two slices (instead of making two separate calls), then combine within there. This way, the knowledge that a frame is 16kb is contained within the transport, which is desirable as it's really a function of the transport not gRPC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Transport.Write will append data to the header slice.
3251fc6
to
b24131b
Compare
b24131b
to
14bc07e
Compare
transport/http2_client.go
Outdated
func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { | ||
r := bytes.NewBuffer(data) | ||
func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { | ||
secondStart := 16379 // 16384 - payloadLen - sizeLen |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be http2MaxFrameLen - len(hdr)
, to avoid presuming the length of the header.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
transport/http2_client.go
Outdated
if isLastSlice && opts.Last { | ||
endStream = true | ||
} else if !isLastSlice && len(data) != 0 { | ||
r = bytes.NewBuffer(data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
r.Reset(data)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not Done. I need some time to check the overhead if using slice directly.
transport/http2_client.go
Outdated
@@ -763,11 +767,19 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { | |||
t.writableChan <- 0 | |||
continue | |||
} | |||
if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 { | |||
if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && isLastSlice { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about moving this after the new if r.Len() == 0
that you added so you don't need to check isLastSlice
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -821,8 +821,15 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { | |||
|
|||
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error | |||
// is returns if it fails (e.g., framing error, transport error). | |||
func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) { | |||
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) (err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
....soooo much duplication :/
All of the comments in http2Client.Write apply here as well.
(Don't try to fix the duplication in this PR.)
This reverts commit 01089b2.
grpc#1427 avoided the allocation of an extra buffer by re-using the buffer generated by the Compressor. It did so by separating out the header and data buffers and delegating to the transport layer the chunking of the two into appropriately sized frames. At the transport layer however, given the affinity to generate 16KB frames, we re-concatenated the header and data buffers and subsequently flushed out contiguous chunks. Given that the header buffer was always 5KB in size, merging the data bytes into the header buffer necessarily involved allocations and copying. In practice for every {Client,Server}Transport.Write we would allocate & copy from 0 to ~16KB bytes of the data buffer into the header buffer. By reverting grpc#1427 and re-working the implementation of rpc_util.go:encode, we can stage a single buffer comprised of both the serialized{,compressed} message with the pre-pended message header. The transport layer is again agnostic to the exact delineation and can simply write out 16KB chunks as needed without having to copy/allocate.
This reverts commit 01089b2.
grpc#1427 avoided the allocation of an extra buffer by re-using the buffer generated by the Compressor. It did so by separating out the header and data buffers and delegating to the transport layer the chunking of the two into appropriately sized frames. At the transport layer however, given the affinity to generate 16KB frames, we re-concatenated the header and data buffers and subsequently flushed out contiguous chunks. Given that the header buffer was always 5KB in size, merging the data bytes into the header buffer necessarily involved allocations and copying. In practice for every {Client,Server}Transport.Write we would allocate & copy from 0 to ~16KB bytes of the data buffer into the header buffer. By reverting grpc#1427 and re-working the implementation of rpc_util.go:encode, we can stage a single buffer comprised of both the serialized{,compressed} message with the pre-pended message header. The transport layer is again agnostic to the exact delineation and can simply write out 16KB chunks as needed without having to copy/allocate.
Re-use the buffer used by compressor. Doesn't need to create new "buf" and copy the old data.